You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:08:07 UTC
svn commit: r1077370 [2/2] - in
/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src:
java/org/apache/hadoop/mapred/gridmix/ test/org/apache/hadoop/mapred/gridmix/
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java?rev=1077370&r1=1077369&r2=1077370&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java Fri Mar 4 04:08:06 2011
@@ -17,262 +17,90 @@
*/
package org.apache.hadoop.mapred.gridmix;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.tools.rumen.JobStory;
-import org.apache.hadoop.tools.rumen.JobStoryProducer;
-import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
-import org.apache.hadoop.tools.rumen.TaskInfo;
-import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
/**
* Component generating random job traces for testing on a single node.
*/
-class DebugJobFactory extends JobFactory {
-
- public DebugJobFactory(JobSubmitter submitter, Path scratch, int numJobs,
- Configuration conf, CountDownLatch startFlag, UserResolver userResolver)
- throws IOException {
- super(submitter, new DebugJobProducer(numJobs, conf), scratch, conf,
- startFlag, userResolver);
- }
-
- ArrayList<JobStory> getSubmitted() {
- return ((DebugJobProducer)jobProducer).submitted;
- }
+class DebugJobFactory {
- private static class DebugJobProducer implements JobStoryProducer {
- final ArrayList<JobStory> submitted;
- private final Configuration conf;
- private final AtomicInteger numJobs;
-
- public DebugJobProducer(int numJobs, Configuration conf) {
- super();
- this.conf = conf;
- this.numJobs = new AtomicInteger(numJobs);
- this.submitted = new ArrayList<JobStory>();
- }
-
- @Override
- public JobStory getNextJob() throws IOException {
- if (numJobs.getAndDecrement() > 0) {
- final MockJob ret = new MockJob(conf);
- submitted.add(ret);
- return ret;
- }
- return null;
- }
-
- @Override
- public void close() { }
+ interface Debuggable {
+ ArrayList<JobStory> getSubmitted();
}
- static double[] getDistr(Random r, double mindist, int size) {
- assert 0.0 <= mindist && mindist <= 1.0;
- final double min = mindist / size;
- final double rem = 1.0 - min * size;
- final double[] tmp = new double[size];
- for (int i = 0; i < tmp.length - 1; ++i) {
- tmp[i] = r.nextDouble() * rem;
- }
- tmp[tmp.length - 1] = rem;
- Arrays.sort(tmp);
+ public static JobFactory getFactory(
+ JobSubmitter submitter, Path scratch, int numJobs, Configuration conf,
+ CountDownLatch startFlag,UserResolver resolver) throws IOException {
+ GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy(
+ conf, GridmixJobSubmissionPolicy.STRESS);
+ if (policy.name().equalsIgnoreCase("REPLAY")) {
+ return new DebugReplayJobFactory(
+ submitter, scratch, numJobs, conf, startFlag,resolver);
+ } else if (policy.name().equalsIgnoreCase("STRESS")) {
+ return new DebugStressJobFactory(
+ submitter, scratch, numJobs, conf, startFlag,resolver);
+ } else if (policy.name().equalsIgnoreCase("SERIAL")) {
+ return new DebugSerialJobFactory(
+ submitter, scratch, numJobs, conf, startFlag,resolver);
- final double[] ret = new double[size];
- ret[0] = tmp[0] + min;
- for (int i = 1; i < size; ++i) {
- ret[i] = tmp[i] - tmp[i-1] + min;
}
- return ret;
+ return null;
}
- /**
- * Generate random task data for a synthetic job.
- */
- static class MockJob implements JobStory {
-
- static final int MIN_REC = 1 << 14;
- static final int MIN_BYTES = 1 << 20;
- static final int VAR_REC = 1 << 14;
- static final int VAR_BYTES = 4 << 20;
- static final int MAX_MAP = 5;
- static final int MAX_RED = 3;
-
- static void initDist(Random r, double min, int[] recs, long[] bytes,
- long tot_recs, long tot_bytes) {
- final double[] recs_dist = getDistr(r, min, recs.length);
- final double[] bytes_dist = getDistr(r, min, recs.length);
- long totalbytes = 0L;
- int totalrecs = 0;
- for (int i = 0; i < recs.length; ++i) {
- recs[i] = (int) Math.round(tot_recs * recs_dist[i]);
- bytes[i] = Math.round(tot_bytes * bytes_dist[i]);
- totalrecs += recs[i];
- totalbytes += bytes[i];
- }
- // Add/remove excess
- recs[0] += totalrecs - tot_recs;
- bytes[0] += totalbytes - tot_bytes;
- if (LOG.isInfoEnabled()) {
- LOG.info("DIST: " + Arrays.toString(recs) + " " +
- tot_recs + "/" + totalrecs + " " +
- Arrays.toString(bytes) + " " + tot_bytes + "/" + totalbytes);
- }
- }
-
- private static final AtomicInteger seq = new AtomicInteger(0);
- // set timestamps in the past
- private static final AtomicLong timestamp =
- new AtomicLong(System.currentTimeMillis() -
- TimeUnit.MILLISECONDS.convert(60, TimeUnit.DAYS));
-
- private final int id;
- private final String name;
- private final int[] m_recsIn, m_recsOut, r_recsIn, r_recsOut;
- private final long[] m_bytesIn, m_bytesOut, r_bytesIn, r_bytesOut;
- private final long submitTime;
-
- public MockJob(Configuration conf) {
- final Random r = new Random();
- final long seed = r.nextLong();
- r.setSeed(seed);
- id = seq.getAndIncrement();
- name = String.format("MOCKJOB%05d", id);
- LOG.info(name + " (" + seed + ")");
- submitTime = timestamp.addAndGet(TimeUnit.MILLISECONDS.convert(
- r.nextInt(10), TimeUnit.SECONDS));
-
- m_recsIn = new int[r.nextInt(MAX_MAP) + 1];
- m_bytesIn = new long[m_recsIn.length];
- m_recsOut = new int[m_recsIn.length];
- m_bytesOut = new long[m_recsIn.length];
-
- r_recsIn = new int[r.nextInt(MAX_RED) + 1];
- r_bytesIn = new long[r_recsIn.length];
- r_recsOut = new int[r_recsIn.length];
- r_bytesOut = new long[r_recsIn.length];
-
- // map input
- final long map_recs = r.nextInt(VAR_REC) + MIN_REC;
- final long map_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
- initDist(r, 0.5, m_recsIn, m_bytesIn, map_recs, map_bytes);
-
- // shuffle
- final long shuffle_recs = r.nextInt(VAR_REC) + MIN_REC;
- final long shuffle_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
- initDist(r, 0.4, m_recsOut, m_bytesOut, shuffle_recs, shuffle_bytes);
- initDist(r, 0.8, r_recsIn, r_bytesIn, shuffle_recs, shuffle_bytes);
-
- // reduce output
- final long red_recs = r.nextInt(VAR_REC) + MIN_REC;
- final long red_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
- initDist(r, 0.4, r_recsOut, r_bytesOut, red_recs, red_bytes);
-
- if (LOG.isDebugEnabled()) {
- int iMapBTotal = 0, oMapBTotal = 0, iRedBTotal = 0, oRedBTotal = 0;
- int iMapRTotal = 0, oMapRTotal = 0, iRedRTotal = 0, oRedRTotal = 0;
- for (int i = 0; i < m_recsIn.length; ++i) {
- iMapRTotal += m_recsIn[i];
- iMapBTotal += m_bytesIn[i];
- oMapRTotal += m_recsOut[i];
- oMapBTotal += m_bytesOut[i];
- }
- for (int i = 0; i < r_recsIn.length; ++i) {
- iRedRTotal += r_recsIn[i];
- iRedBTotal += r_bytesIn[i];
- oRedRTotal += r_recsOut[i];
- oRedBTotal += r_bytesOut[i];
- }
- LOG.debug(String.format("%s: M (%03d) %6d/%10d -> %6d/%10d" +
- " R (%03d) %6d/%10d -> %6d/%10d @%d", name,
- m_bytesIn.length, iMapRTotal, iMapBTotal, oMapRTotal, oMapBTotal,
- r_bytesIn.length, iRedRTotal, iRedBTotal, oRedRTotal, oRedBTotal,
- submitTime));
- }
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public String getUser() {
- return String.format("foobar%d", id);
- }
-
- @Override
- public JobID getJobID() {
- return new JobID("job_mock_" + name, id);
- }
-
- @Override
- public Values getOutcome() {
- return Values.SUCCESS;
- }
-
- @Override
- public long getSubmissionTime() {
- return submitTime;
+ static class DebugReplayJobFactory extends ReplayJobFactory
+ implements Debuggable {
+ public DebugReplayJobFactory(
+ JobSubmitter submitter, Path scratch, int numJobs, Configuration conf,
+ CountDownLatch startFlag,UserResolver resolver) throws IOException {
+ super(
+ submitter, new DebugJobProducer(numJobs, conf), scratch, conf,
+ startFlag,resolver);
}
@Override
- public int getNumberMaps() {
- return m_bytesIn.length;
+ public ArrayList<JobStory> getSubmitted() {
+ return ((DebugJobProducer) jobProducer).submitted;
}
- @Override
- public int getNumberReduces() {
- return r_bytesIn.length;
- }
+ }
- @Override
- public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
- switch (taskType) {
- case MAP:
- return new TaskInfo(m_bytesIn[taskNumber], m_recsIn[taskNumber],
- m_bytesOut[taskNumber], m_recsOut[taskNumber], -1);
- case REDUCE:
- return new TaskInfo(r_bytesIn[taskNumber], r_recsIn[taskNumber],
- r_bytesOut[taskNumber], r_recsOut[taskNumber], -1);
- default:
- throw new IllegalArgumentException("Not interested");
- }
+ static class DebugSerialJobFactory extends SerialJobFactory
+ implements Debuggable {
+ public DebugSerialJobFactory(
+ JobSubmitter submitter, Path scratch, int numJobs, Configuration conf,
+ CountDownLatch startFlag,UserResolver resolver) throws IOException {
+ super(
+ submitter, new DebugJobProducer(numJobs, conf), scratch, conf,
+ startFlag,resolver);
}
@Override
- public InputSplit[] getInputSplits() {
- throw new UnsupportedOperationException();
+ public ArrayList<JobStory> getSubmitted() {
+ return ((DebugJobProducer) jobProducer).submitted;
}
- @Override
- public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType,
- int taskNumber, int taskAttemptNumber) {
- throw new UnsupportedOperationException();
- }
+ }
- @Override
- public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber,
- int taskAttemptNumber, int locality) {
- throw new UnsupportedOperationException();
+ static class DebugStressJobFactory extends StressJobFactory
+ implements Debuggable {
+ public DebugStressJobFactory(
+ JobSubmitter submitter, Path scratch, int numJobs, Configuration conf,
+ CountDownLatch startFlag,UserResolver resolver) throws IOException {
+ super(
+ submitter, new DebugJobProducer(numJobs, conf), scratch, conf,
+ startFlag,resolver);
}
@Override
- public org.apache.hadoop.mapred.JobConf getJobConf() {
- throw new UnsupportedOperationException();
+ public ArrayList<JobStory> getSubmitted() {
+ return ((DebugJobProducer) jobProducer).submitted;
}
}
+
}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java?rev=1077370&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java Fri Mar 4 04:08:06 2011
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.TaskInfo;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.TimeUnit;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+public class DebugJobProducer implements JobStoryProducer {
+ public static final Log LOG = LogFactory.getLog(DebugJobProducer.class);
+ final ArrayList<JobStory> submitted;
+ private final Configuration conf;
+ private final AtomicInteger numJobs;
+
+ public DebugJobProducer(int numJobs, Configuration conf) {
+ super();
+ MockJob.reset();
+ this.conf = conf;
+ this.numJobs = new AtomicInteger(numJobs);
+ this.submitted = new ArrayList<JobStory>();
+ }
+
+ @Override
+ public JobStory getNextJob() throws IOException {
+ if (numJobs.getAndDecrement() > 0) {
+ final MockJob ret = new MockJob(conf);
+ submitted.add(ret);
+ return ret;
+ }
+ return null;
+ }
+
+ @Override
+ public void close() {
+ }
+
+
+ static double[] getDistr(Random r, double mindist, int size) {
+ assert 0.0 <= mindist && mindist <= 1.0;
+ final double min = mindist / size;
+ final double rem = 1.0 - min * size;
+ final double[] tmp = new double[size];
+ for (int i = 0; i < tmp.length - 1; ++i) {
+ tmp[i] = r.nextDouble() * rem;
+ }
+ tmp[tmp.length - 1] = rem;
+ Arrays.sort(tmp);
+
+ final double[] ret = new double[size];
+ ret[0] = tmp[0] + min;
+ for (int i = 1; i < size; ++i) {
+ ret[i] = tmp[i] - tmp[i - 1] + min;
+ }
+ return ret;
+ }
+
+
+ /**
+ * Generate random task data for a synthetic job.
+ */
+ static class MockJob implements JobStory {
+
+ static final int MIN_REC = 1 << 14;
+ static final int MIN_BYTES = 1 << 20;
+ static final int VAR_REC = 1 << 14;
+ static final int VAR_BYTES = 4 << 20;
+ static final int MAX_MAP = 5;
+ static final int MAX_RED = 3;
+ final Configuration conf;
+
+ static void initDist(
+ Random r, double min, int[] recs, long[] bytes, long tot_recs,
+ long tot_bytes) {
+ final double[] recs_dist = getDistr(r, min, recs.length);
+ final double[] bytes_dist = getDistr(r, min, recs.length);
+ long totalbytes = 0L;
+ int totalrecs = 0;
+ for (int i = 0; i < recs.length; ++i) {
+ recs[i] = (int) Math.round(tot_recs * recs_dist[i]);
+ bytes[i] = Math.round(tot_bytes * bytes_dist[i]);
+ totalrecs += recs[i];
+ totalbytes += bytes[i];
+ }
+ // Add/remove excess
+ recs[0] += totalrecs - tot_recs;
+ bytes[0] += totalbytes - tot_bytes;
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "DIST: " + Arrays.toString(recs) + " " + tot_recs + "/" + totalrecs +
+ " " + Arrays.toString(bytes) + " " + tot_bytes + "/" + totalbytes);
+ }
+ }
+
+ private static final AtomicInteger seq = new AtomicInteger(0);
+ // set timestamp in the past
+ private static final AtomicLong timestamp = new AtomicLong(
+ System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(
+ 60, TimeUnit.DAYS));
+
+ private final int id;
+ private final String name;
+ private final int[] m_recsIn, m_recsOut, r_recsIn, r_recsOut;
+ private final long[] m_bytesIn, m_bytesOut, r_bytesIn, r_bytesOut;
+ private final long submitTime;
+
+ public MockJob(Configuration conf) {
+ final Random r = new Random();
+ final long seed = r.nextLong();
+ r.setSeed(seed);
+ id = seq.getAndIncrement();
+ name = String.format("MOCKJOB%05d", id);
+ this.conf = conf;
+ LOG.info(name + " (" + seed + ")");
+ submitTime = timestamp.addAndGet(
+ TimeUnit.MILLISECONDS.convert(
+ r.nextInt(10), TimeUnit.SECONDS));
+
+ m_recsIn = new int[r.nextInt(MAX_MAP) + 1];
+ m_bytesIn = new long[m_recsIn.length];
+ m_recsOut = new int[m_recsIn.length];
+ m_bytesOut = new long[m_recsIn.length];
+
+ r_recsIn = new int[r.nextInt(MAX_RED) + 1];
+ r_bytesIn = new long[r_recsIn.length];
+ r_recsOut = new int[r_recsIn.length];
+ r_bytesOut = new long[r_recsIn.length];
+
+ // map input
+ final long map_recs = r.nextInt(VAR_REC) + MIN_REC;
+ final long map_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+ initDist(r, 0.5, m_recsIn, m_bytesIn, map_recs, map_bytes);
+
+ // shuffle
+ final long shuffle_recs = r.nextInt(VAR_REC) + MIN_REC;
+ final long shuffle_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+ initDist(r, 0.5, m_recsOut, m_bytesOut, shuffle_recs, shuffle_bytes);
+ initDist(r, 0.8, r_recsIn, r_bytesIn, shuffle_recs, shuffle_bytes);
+
+ // reduce output
+ final long red_recs = r.nextInt(VAR_REC) + MIN_REC;
+ final long red_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+ initDist(r, 0.5, r_recsOut, r_bytesOut, red_recs, red_bytes);
+
+ if (LOG.isDebugEnabled()) {
+ int iMapBTotal = 0, oMapBTotal = 0, iRedBTotal = 0, oRedBTotal = 0;
+ int iMapRTotal = 0, oMapRTotal = 0, iRedRTotal = 0, oRedRTotal = 0;
+ for (int i = 0; i < m_recsIn.length; ++i) {
+ iMapRTotal += m_recsIn[i];
+ iMapBTotal += m_bytesIn[i];
+ oMapRTotal += m_recsOut[i];
+ oMapBTotal += m_bytesOut[i];
+ }
+ for (int i = 0; i < r_recsIn.length; ++i) {
+ iRedRTotal += r_recsIn[i];
+ iRedBTotal += r_bytesIn[i];
+ oRedRTotal += r_recsOut[i];
+ oRedBTotal += r_bytesOut[i];
+ }
+ LOG.debug(
+ String.format(
+ "%s: M (%03d) %6d/%10d -> %6d/%10d" +
+ " R (%03d) %6d/%10d -> %6d/%10d @%d", name, m_bytesIn.length,
+ iMapRTotal, iMapBTotal, oMapRTotal, oMapBTotal, r_bytesIn.length,
+ iRedRTotal, iRedBTotal, oRedRTotal, oRedBTotal, submitTime));
+ }
+ }
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String getUser() {
+ String s = String.format("foobar%d", id);
+ GridmixTestUtils.createHomeAndStagingDirectory(s,(JobConf)conf);
+ return s;
+ }
+
+ @Override
+ public JobID getJobID() {
+ return new JobID("job_mock_" + name, id);
+ }
+
+ @Override
+ public Values getOutcome() {
+ return Values.SUCCESS;
+ }
+
+ @Override
+ public long getSubmissionTime() {
+ return submitTime;
+ }
+
+ @Override
+ public int getNumberMaps() {
+ return m_bytesIn.length;
+ }
+
+ @Override
+ public int getNumberReduces() {
+ return r_bytesIn.length;
+ }
+
+ @Override
+ public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+ switch (taskType) {
+ case MAP:
+ return new TaskInfo(m_bytesIn[taskNumber], m_recsIn[taskNumber],
+ m_bytesOut[taskNumber], m_recsOut[taskNumber], -1);
+ case REDUCE:
+ return new TaskInfo(r_bytesIn[taskNumber], r_recsIn[taskNumber],
+ r_bytesOut[taskNumber], r_recsOut[taskNumber], -1);
+ default:
+ throw new IllegalArgumentException("Not interested");
+ }
+ }
+
+ @Override
+ public InputSplit[] getInputSplits() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TaskAttemptInfo getTaskAttemptInfo(
+ TaskType taskType, int taskNumber, int taskAttemptNumber) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(
+ int taskNumber, int taskAttemptNumber, int locality) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public org.apache.hadoop.mapred.JobConf getJobConf() {
+ throw new UnsupportedOperationException();
+ }
+
+ public static void reset() {
+ seq.set(0);
+ timestamp.set(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(
+ 60, TimeUnit.DAYS));
+ }
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/GridmixTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/GridmixTestUtils.java?rev=1077370&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/GridmixTestUtils.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/GridmixTestUtils.java Fri Mar 4 04:08:06 2011
@@ -0,0 +1,90 @@
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
+import org.apache.hadoop.security.Groups;
+
+import java.io.IOException;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class GridmixTestUtils {
+ static final Path DEST = new Path("/gridmix");
+ static FileSystem dfs = null;
+ static MiniDFSCluster dfsCluster = null;
+ static MiniMRCluster mrCluster = null;
+
+ public static void initCluster() throws IOException {
+ Configuration conf = new Configuration();
+ dfsCluster = new MiniDFSCluster(conf, 3, true, null);
+ dfs = dfsCluster.getFileSystem();
+ mrCluster = new MiniMRCluster(
+ 3, dfs.getUri().toString(), 1, null, null, new JobConf(conf));
+ }
+
+ public static void shutdownCluster() throws IOException {
+ if (mrCluster != null) {
+ mrCluster.shutdown();
+ }
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ }
+ }
+
+ /**
+ * Methods to generate the home directory for dummy users.
+ *
+ * @param conf
+ */
+ public static void createHomeAndStagingDirectory(String user, JobConf conf) {
+ try {
+ FileSystem fs = dfsCluster.getFileSystem();
+ String path = "/user/" + user;
+ Path homeDirectory = new Path(path);
+ if(fs.exists(homeDirectory)) {
+ fs.delete(homeDirectory,true);
+ }
+ TestGridmixSubmission.LOG.info(
+ "Creating Home directory : " + homeDirectory);
+ fs.mkdirs(homeDirectory);
+ changePermission(user,homeDirectory, fs);
+ Path stagingArea = new Path(
+ conf.get(
+ "mapreduce.jobtracker.staging.root.dir",
+ "/tmp/hadoop/mapred/staging"));
+ TestGridmixSubmission.LOG.info(
+ "Creating Staging root directory : " + stagingArea);
+ fs.mkdirs(stagingArea);
+ fs.setPermission(stagingArea, new FsPermission((short) 0777));
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+ }
+
+ static void changePermission(String user, Path homeDirectory, FileSystem fs)
+ throws IOException {
+ fs.setOwner(homeDirectory, user, "");
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=1077370&r1=1077369&r2=1077370&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Fri Mar 4 04:08:06 2011
@@ -28,15 +28,14 @@ import java.util.concurrent.LinkedBlocki
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapreduce.Job;
@@ -46,16 +45,22 @@ import org.apache.hadoop.tools.rumen.Tas
import org.apache.hadoop.util.ToolRunner;
import static org.apache.hadoop.mapred.Task.Counter.*;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
import static org.junit.Assert.*;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.log4j.Level;
+import org.apache.hadoop.security.UserGroupInformation;
+import java.security.PrivilegedExceptionAction;
public class TestGridmixSubmission {
+ static GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.REPLAY;
+ public static final Log LOG = LogFactory.getLog(Gridmix.class);
+
{
((Log4JLogger)LogFactory.getLog("org.apache.hadoop.mapred.gridmix")
).getLogger().setLevel(Level.DEBUG);
@@ -72,22 +77,13 @@ public class TestGridmixSubmission {
private static final int GENSLOP = 100 * 1024; // +/- 100k for logs
@BeforeClass
- public static void initCluster() throws IOException {
- Configuration conf = new Configuration();
- dfsCluster = new MiniDFSCluster(conf, 3, true, null);
- dfs = dfsCluster.getFileSystem();
- mrCluster = new MiniMRCluster(3, dfs.getUri().toString(), 1, null, null,
- new JobConf(conf));
+ public static void init() throws IOException {
+ GridmixTestUtils.initCluster();
}
@AfterClass
- public static void shutdownCluster() throws IOException {
- if (mrCluster != null) {
- mrCluster.shutdown();
- }
- if (dfsCluster != null) {
- dfsCluster.shutdown();
- }
+ public static void shutDown() throws IOException {
+ GridmixTestUtils.shutdownCluster();
}
static class TestMonitor extends JobMonitor {
@@ -96,8 +92,8 @@ public class TestGridmixSubmission {
private final int expected;
private final BlockingQueue<Job> retiredJobs;
- public TestMonitor(int expected) {
- super();
+ public TestMonitor(int expected, Statistics stats) {
+ super(stats);
this.expected = expected;
retiredJobs = new LinkedBlockingQueue<Job>();
}
@@ -109,17 +105,17 @@ public class TestGridmixSubmission {
for (JobStory spec : submitted) {
sub.put(spec.getName(), spec);
}
- final JobClient client = new JobClient(mrCluster.createJobConf());
+ final JobClient client = new JobClient(GridmixTestUtils.mrCluster.createJobConf());
for (Job job : succeeded) {
final String jobname = job.getJobName();
if ("GRIDMIX_GENDATA".equals(jobname)) {
- final Path in = new Path("foo").makeQualified(dfs);
- final Path out = new Path("/gridmix").makeQualified(dfs);
- final ContentSummary generated = dfs.getContentSummary(in);
+ final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs);
+ final Path out = new Path("/gridmix").makeQualified(GridmixTestUtils.dfs);
+ final ContentSummary generated = GridmixTestUtils.dfs.getContentSummary(in);
assertTrue("Mismatched data gen", // +/- 100k for logs
(GENDATA << 20) < generated.getLength() + GENSLOP ||
(GENDATA << 20) > generated.getLength() - GENSLOP);
- FileStatus[] outstat = dfs.listStatus(out);
+ FileStatus[] outstat = GridmixTestUtils.dfs.listStatus(out);
assertEquals("Mismatched job count", NJOBS, outstat.length);
continue;
}
@@ -128,7 +124,8 @@ public class TestGridmixSubmission {
assertNotNull("No spec for " + job.getJobName(), spec);
assertNotNull("No counters for " + job.getJobName(), job.getCounters());
final String specname = spec.getName();
- final FileStatus stat = dfs.getFileStatus(new Path(DEST, "" +
+ final FileStatus stat = GridmixTestUtils.dfs.getFileStatus(new Path(
+ GridmixTestUtils.DEST, "" +
Integer.valueOf(specname.substring(specname.length() - 5))));
assertEquals("Wrong owner for " + job.getJobName(), spec.getUser(),
stat.getOwner());
@@ -287,35 +284,62 @@ public class TestGridmixSubmission {
static class DebugGridmix extends Gridmix {
- private DebugJobFactory factory;
+ private JobFactory factory;
private TestMonitor monitor;
public void checkMonitor() throws Exception {
- monitor.verify(factory.getSubmitted());
+ monitor.verify(((DebugJobFactory.Debuggable)factory).getSubmitted());
}
@Override
- protected JobMonitor createJobMonitor() {
- monitor = new TestMonitor(NJOBS + 1); // include data generation job
+ protected JobMonitor createJobMonitor(Statistics stats) {
+ Configuration conf = new Configuration();
+ conf.set(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy.name());
+ monitor = new TestMonitor(NJOBS + 1, stats);
return monitor;
}
-
+
@Override
- protected JobFactory createJobFactory(JobSubmitter submitter,
- String traceIn, Path scratchDir, Configuration conf,
- CountDownLatch startFlag, UserResolver userResolver)
+ protected JobFactory createJobFactory(
+ JobSubmitter submitter, String traceIn, Path scratchDir, Configuration conf,
+ CountDownLatch startFlag, UserResolver userResolver)
throws IOException {
- factory =
- new DebugJobFactory(submitter, scratchDir, NJOBS, conf, startFlag,
- userResolver);
+ factory = DebugJobFactory.getFactory(
+ submitter, scratchDir, NJOBS, conf, startFlag, userResolver);
return factory;
}
}
@Test
- public void testSubmit() throws Exception {
- final Path in = new Path("foo").makeQualified(dfs);
- final Path out = DEST.makeQualified(dfs);
+ public void testReplaySubmit() throws Exception {
+ policy = GridmixJobSubmissionPolicy.REPLAY;
+ System.out.println(" Replay started at " + System.currentTimeMillis());
+ doSubmission();
+ System.out.println(" Replay ended at " + System.currentTimeMillis());
+ }
+
+ @Test
+ public void testStressSubmit() throws Exception {
+ policy = GridmixJobSubmissionPolicy.STRESS;
+ System.out.println(" Stress started at " + System.currentTimeMillis());
+ doSubmission();
+ System.out.println(" Stress ended at " + System.currentTimeMillis());
+ }
+
+ @Test
+ public void testSerialSubmit() throws Exception {
+ policy = GridmixJobSubmissionPolicy.SERIAL;
+ System.out.println("Serial started at " + System.currentTimeMillis());
+ doSubmission();
+ System.out.println("Serial ended at " + System.currentTimeMillis());
+ }
+
+ private void doSubmission() throws Exception {
+ final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs);
+ final Path out = GridmixTestUtils.DEST.makeQualified(GridmixTestUtils.dfs);
+ final Path root = new Path("/user");
+ Configuration conf = null;
+ try{
final String[] argv = {
"-D" + FilePool.GRIDMIX_MIN_FILE + "=0",
"-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out,
@@ -325,14 +349,24 @@ public class TestGridmixSubmission {
"-" // ignored by DebugGridmix
};
DebugGridmix client = new DebugGridmix();
- final Configuration conf = mrCluster.createJobConf();
+ conf = new Configuration();
+ conf.set(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY,policy.name());
+ conf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
+// GridmixTestUtils.createHomeAndStagingDirectory((JobConf)conf);
// allow synthetic users to create home directories
- final Path root = new Path("/user");
- dfs.mkdirs(root, new FsPermission((short)0777));
- dfs.setPermission(root, new FsPermission((short)0777));
+ GridmixTestUtils.dfs.mkdirs(root, new FsPermission((short)0777));
+ GridmixTestUtils.dfs.setPermission(root, new FsPermission((short)0777));
int res = ToolRunner.run(conf, client, argv);
assertEquals("Client exited with nonzero status", 0, res);
client.checkMonitor();
- }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ in.getFileSystem(conf).delete(in, true);
+ out.getFileSystem(conf).delete(out, true);
+ root.getFileSystem(conf).delete(root,true);
+ }
+ }
+
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestUserResolve.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestUserResolve.java?rev=1077370&r1=1077369&r2=1077370&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestUserResolve.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestUserResolve.java Fri Mar 4 04:08:06 2011
@@ -72,17 +72,13 @@ public class TestUserResolve {
assertTrue("User list required for RoundRobinUserResolver", fail);
rslv.setTargetUsers(new URI(userlist.toString()), conf);
- assertEquals("user0", rslv.getTargetUgi("hfre0").getUserName());
- assertEquals("user1", rslv.getTargetUgi("hfre1").getUserName());
- assertEquals("user2", rslv.getTargetUgi("hfre2").getUserName());
- assertEquals("user0", rslv.getTargetUgi("hfre0").getUserName());
- assertEquals("user3", rslv.getTargetUgi("hfre3").getUserName());
- assertEquals("user0", rslv.getTargetUgi("hfre0").getUserName());
- assertEquals("user0", rslv.getTargetUgi("hfre4").getUserName());
- assertArrayEquals(new String[] { "groupA", "groupB", "groupC" },
- rslv.getTargetUgi("hfre0").getGroupNames());
- assertArrayEquals(new String[] { "groupB" },
- rslv.getTargetUgi("hfre2").getGroupNames());
+ assertEquals("user0", rslv.getTargetUgi(UserGroupInformation.createRemoteUser("hfre0")).getUserName());
+ assertEquals("user1", rslv.getTargetUgi(UserGroupInformation.createRemoteUser("hfre1")).getUserName());
+ assertEquals("user2", rslv.getTargetUgi(UserGroupInformation.createRemoteUser("hfre2")).getUserName());
+ assertEquals("user0", rslv.getTargetUgi(UserGroupInformation.createRemoteUser("hfre0")).getUserName());
+ assertEquals("user3", rslv.getTargetUgi(UserGroupInformation.createRemoteUser("hfre3")).getUserName());
+ assertEquals("user0", rslv.getTargetUgi(UserGroupInformation.createRemoteUser("hfre0")).getUserName());
+ assertEquals("user0", rslv.getTargetUgi(UserGroupInformation.createRemoteUser("hfre4")).getUserName());
}
@Test
@@ -90,8 +86,12 @@ public class TestUserResolve {
final Configuration conf = new Configuration();
final UserResolver rslv = new SubmitterUserResolver();
rslv.setTargetUsers(null, conf);
- assertEquals(UnixUserGroupInformation.login(),
- rslv.getTargetUgi((UserGroupInformation)null));
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ assertEquals(ugi, rslv.getTargetUgi((UserGroupInformation)null));
+ System.out.println(" Submitter current user " + ugi);
+ System.out.println(
+ " Target ugi " + rslv.getTargetUgi(
+ (UserGroupInformation) null));
}
}