You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:38:23 UTC
svn commit: r1077079 [3/11] - in
/hadoop/common/branches/branch-0.20-security-patches: ./ src/contrib/
src/contrib/gridmix/ src/contrib/gridmix/ivy/ src/contrib/gridmix/src/
src/contrib/gridmix/src/java/ src/contrib/gridmix/src/java/org/
src/contrib/gr...
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskInfo;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;;
+
+
+/**
+ * Component generating random job traces for testing on a single node.
+ */
+class DebugJobFactory extends JobFactory {
+
+ public DebugJobFactory(JobSubmitter submitter, Path scratch, int numJobs,
+ Configuration conf, CountDownLatch startFlag) throws IOException {
+ super(submitter, new DebugJobProducer(numJobs, conf), scratch, conf,
+ startFlag);
+ }
+
+ ArrayList<JobStory> getSubmitted() {
+ return ((DebugJobProducer)jobProducer).submitted;
+ }
+
+ private static class DebugJobProducer implements JobStoryProducer {
+ final ArrayList<JobStory> submitted;
+ private final Configuration conf;
+ private final AtomicInteger numJobs;
+
+ public DebugJobProducer(int numJobs, Configuration conf) {
+ super();
+ this.conf = conf;
+ this.numJobs = new AtomicInteger(numJobs);
+ this.submitted = new ArrayList<JobStory>();
+ }
+
+ @Override
+ public JobStory getNextJob() throws IOException {
+ if (numJobs.getAndDecrement() > 0) {
+ final MockJob ret = new MockJob(conf);
+ submitted.add(ret);
+ return ret;
+ }
+ return null;
+ }
+
+ @Override
+ public void close() { }
+ }
+
+ static double[] getDistr(Random r, double mindist, int size) {
+ assert 0.0 <= mindist && mindist <= 1.0;
+ final double min = mindist / size;
+ final double rem = 1.0 - min * size;
+ final double[] tmp = new double[size];
+ for (int i = 0; i < tmp.length - 1; ++i) {
+ tmp[i] = r.nextDouble() * rem;
+ }
+ tmp[tmp.length - 1] = rem;
+ Arrays.sort(tmp);
+
+ final double[] ret = new double[size];
+ ret[0] = tmp[0] + min;
+ for (int i = 1; i < size; ++i) {
+ ret[i] = tmp[i] - tmp[i-1] + min;
+ }
+ return ret;
+ }
+
+ /**
+ * Generate random task data for a synthetic job.
+ */
+ static class MockJob implements JobStory {
+
+ static final int MIN_REC = 1 << 14;
+ static final int MIN_BYTES = 1 << 20;
+ static final int VAR_REC = 1 << 14;
+ static final int VAR_BYTES = 4 << 20;
+ static final int MAX_MAP = 5;
+ static final int MAX_RED = 3;
+
+ static void initDist(Random r, double min, int[] recs, long[] bytes,
+ long tot_recs, long tot_bytes) {
+ final double[] recs_dist = getDistr(r, min, recs.length);
+ final double[] bytes_dist = getDistr(r, min, recs.length);
+ long totalbytes = 0L;
+ int totalrecs = 0;
+ for (int i = 0; i < recs.length; ++i) {
+ recs[i] = (int) Math.round(tot_recs * recs_dist[i]);
+ bytes[i] = Math.round(tot_bytes * bytes_dist[i]);
+ totalrecs += recs[i];
+ totalbytes += bytes[i];
+ }
+ // Add/remove excess
+ recs[0] += totalrecs - tot_recs;
+ bytes[0] += totalbytes - tot_bytes;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("DIST: " + Arrays.toString(recs) + " " +
+ tot_recs + "/" + totalrecs + " " +
+ Arrays.toString(bytes) + " " + tot_bytes + "/" + totalbytes);
+ }
+ }
+
+ private static final AtomicInteger seq = new AtomicInteger(0);
+ // set timestamps in the past
+ private static final AtomicLong timestamp =
+ new AtomicLong(System.currentTimeMillis() -
+ TimeUnit.MILLISECONDS.convert(60, TimeUnit.DAYS));
+
+ private final int id;
+ private final String name;
+ private final int[] m_recsIn, m_recsOut, r_recsIn, r_recsOut;
+ private final long[] m_bytesIn, m_bytesOut, r_bytesIn, r_bytesOut;
+ private final long submitTime;
+
+ public MockJob(Configuration conf) {
+ final Random r = new Random();
+ final long seed = r.nextLong();
+ r.setSeed(seed);
+ id = seq.getAndIncrement();
+ name = String.format("MOCKJOB%05d", id);
+ LOG.info(name + " (" + seed + ")");
+ submitTime = timestamp.addAndGet(TimeUnit.MILLISECONDS.convert(
+ r.nextInt(10), TimeUnit.SECONDS));
+
+ m_recsIn = new int[r.nextInt(MAX_MAP) + 1];
+ m_bytesIn = new long[m_recsIn.length];
+ m_recsOut = new int[m_recsIn.length];
+ m_bytesOut = new long[m_recsIn.length];
+
+ r_recsIn = new int[r.nextInt(MAX_RED) + 1];
+ r_bytesIn = new long[r_recsIn.length];
+ r_recsOut = new int[r_recsIn.length];
+ r_bytesOut = new long[r_recsIn.length];
+
+ // map input
+ final long map_recs = r.nextInt(VAR_REC) + MIN_REC;
+ final long map_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+ initDist(r, 0.5, m_recsIn, m_bytesIn, map_recs, map_bytes);
+
+ // shuffle
+ final long shuffle_recs = r.nextInt(VAR_REC) + MIN_REC;
+ final long shuffle_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+ initDist(r, 0.4, m_recsOut, m_bytesOut, shuffle_recs, shuffle_bytes);
+ initDist(r, 0.8, r_recsIn, r_bytesIn, shuffle_recs, shuffle_bytes);
+
+ // reduce output
+ final long red_recs = r.nextInt(VAR_REC) + MIN_REC;
+ final long red_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+ initDist(r, 0.4, r_recsOut, r_bytesOut, red_recs, red_bytes);
+
+ if (LOG.isDebugEnabled()) {
+ int iMapBTotal = 0, oMapBTotal = 0, iRedBTotal = 0, oRedBTotal = 0;
+ int iMapRTotal = 0, oMapRTotal = 0, iRedRTotal = 0, oRedRTotal = 0;
+ for (int i = 0; i < m_recsIn.length; ++i) {
+ iMapRTotal += m_recsIn[i];
+ iMapBTotal += m_bytesIn[i];
+ oMapRTotal += m_recsOut[i];
+ oMapBTotal += m_bytesOut[i];
+ }
+ for (int i = 0; i < r_recsIn.length; ++i) {
+ iRedRTotal += r_recsIn[i];
+ iRedBTotal += r_bytesIn[i];
+ oRedRTotal += r_recsOut[i];
+ oRedBTotal += r_bytesOut[i];
+ }
+ LOG.debug(String.format("%s: M (%03d) %6d/%10d -> %6d/%10d" +
+ " R (%03d) %6d/%10d -> %6d/%10d @%d", name,
+ m_bytesIn.length, iMapRTotal, iMapBTotal, oMapRTotal, oMapBTotal,
+ r_bytesIn.length, iRedRTotal, iRedBTotal, oRedRTotal, oRedBTotal,
+ submitTime));
+ }
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String getUser() {
+ return "FOOBAR";
+ }
+
+ @Override
+ public JobID getJobID() {
+ return new JobID("job_mock_" + name, id);
+ }
+
+ @Override
+ public Values getOutcome() {
+ return Values.SUCCESS;
+ }
+
+ @Override
+ public long getSubmissionTime() {
+ return submitTime;
+ }
+
+ @Override
+ public int getNumberMaps() {
+ return m_bytesIn.length;
+ }
+
+ @Override
+ public int getNumberReduces() {
+ return r_bytesIn.length;
+ }
+
+ @Override
+ public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+ switch (taskType) {
+ case MAP:
+ return new TaskInfo(m_bytesIn[taskNumber], m_recsIn[taskNumber],
+ m_bytesOut[taskNumber], m_recsOut[taskNumber], -1);
+ case REDUCE:
+ return new TaskInfo(r_bytesIn[taskNumber], r_recsIn[taskNumber],
+ r_bytesOut[taskNumber], r_recsOut[taskNumber], -1);
+ default:
+ throw new IllegalArgumentException("Not interested");
+ }
+ }
+
+ @Override
+ public InputSplit[] getInputSplits() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType,
+ int taskNumber, int taskAttemptNumber) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber,
+ int taskAttemptNumber, int locality) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public org.apache.hadoop.mapred.JobConf getJobConf() {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFilePool.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFilePool.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFilePool.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFilePool.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Random;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class TestFilePool {
+
+ static final Log LOG = LogFactory.getLog(TestFileQueue.class);
+ static final int NFILES = 26;
+ static final Path base = getBaseDir();
+
+ static Path getBaseDir() {
+ try {
+ final Configuration conf = new Configuration();
+ final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+ return new Path(System.getProperty("test.build.data", "/tmp"),
+ "testFilePool").makeQualified(fs);
+ } catch (IOException e) {
+ fail();
+ }
+ return null;
+ }
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ final Configuration conf = new Configuration();
+ final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+ fs.delete(base, true);
+ final Random r = new Random();
+ final long seed = r.nextLong();
+ r.setSeed(seed);
+ LOG.info("seed: " + seed);
+ fs.mkdirs(base);
+ for (int i = 0; i < NFILES; ++i) {
+ Path file = base;
+ for (double d = 0.6; d > 0.0; d *= 0.8) {
+ if (r.nextDouble() < d) {
+ file = new Path(base, Integer.toString(r.nextInt(3)));
+ continue;
+ }
+ break;
+ }
+ OutputStream out = null;
+ try {
+ out = fs.create(new Path(file, "" + (char)('A' + i)));
+ final byte[] b = new byte[1024];
+ Arrays.fill(b, (byte)('A' + i));
+ for (int len = ((i % 13) + 1) * 1024; len > 0; len -= 1024) {
+ out.write(b);
+ }
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+ }
+
+ @AfterClass
+ public static void cleanup() throws IOException {
+ final Configuration conf = new Configuration();
+ final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+ fs.delete(base, true);
+ }
+
+ @Test
+ public void testUnsuitable() throws Exception {
+ try {
+ final Configuration conf = new Configuration();
+ // all files 13k or less
+ conf.setLong(FilePool.GRIDMIX_MIN_FILE, 14 * 1024);
+ final FilePool pool = new FilePool(conf, base);
+ pool.refresh();
+ } catch (IOException e) {
+ return;
+ }
+ fail();
+ }
+
+ @Test
+ public void testPool() throws Exception {
+ final Random r = new Random();
+ final Configuration conf = new Configuration();
+ conf.setLong(FilePool.GRIDMIX_MIN_FILE, 3 * 1024);
+ final FilePool pool = new FilePool(conf, base);
+ pool.refresh();
+ final ArrayList<FileStatus> files = new ArrayList<FileStatus>();
+
+ // ensure 1k, 2k files excluded
+ final int expectedPoolSize = (NFILES / 2 * (NFILES / 2 + 1) - 6) * 1024;
+ assertEquals(expectedPoolSize, pool.getInputFiles(Long.MAX_VALUE, files));
+ assertEquals(NFILES - 4, files.size());
+
+ // exact match
+ files.clear();
+ assertEquals(expectedPoolSize, pool.getInputFiles(expectedPoolSize, files));
+
+ // match random within 12k
+ files.clear();
+ final long rand = r.nextInt(expectedPoolSize);
+ assertTrue("Missed: " + rand,
+ (NFILES / 2) * 1024 > rand - pool.getInputFiles(rand, files));
+
+ // all files
+ conf.setLong(FilePool.GRIDMIX_MIN_FILE, 0);
+ pool.refresh();
+ files.clear();
+ assertEquals((NFILES / 2 * (NFILES / 2 + 1)) * 1024,
+ pool.getInputFiles(Long.MAX_VALUE, files));
+ }
+
+ void checkSplitEq(FileSystem fs, CombineFileSplit split, long bytes)
+ throws Exception {
+ long splitBytes = 0L;
+ HashSet<Path> uniq = new HashSet<Path>();
+ for (int i = 0; i < split.getNumPaths(); ++i) {
+ splitBytes += split.getLength(i);
+ assertTrue(
+ split.getLength(i) <= fs.getFileStatus(split.getPath(i)).getLen());
+ assertFalse(uniq.contains(split.getPath(i)));
+ uniq.add(split.getPath(i));
+ }
+ assertEquals(bytes, splitBytes);
+ }
+
+ @Test
+ public void testStriper() throws Exception {
+ final Random r = new Random();
+ final Configuration conf = new Configuration();
+ final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+ conf.setLong(FilePool.GRIDMIX_MIN_FILE, 3 * 1024);
+ final FilePool pool = new FilePool(conf, base) {
+ @Override
+ public BlockLocation[] locationsFor(FileStatus stat, long start, long len)
+ throws IOException {
+ return new BlockLocation[] { new BlockLocation() };
+ }
+ };
+ pool.refresh();
+
+ final int expectedPoolSize = (NFILES / 2 * (NFILES / 2 + 1) - 6) * 1024;
+ final InputStriper striper = new InputStriper(pool, expectedPoolSize);
+ int last = 0;
+ for (int i = 0; i < expectedPoolSize;
+ last = Math.min(expectedPoolSize - i, r.nextInt(expectedPoolSize))) {
+ checkSplitEq(fs, striper.splitFor(pool, last, 0), last);
+ i += last;
+ }
+ final InputStriper striper2 = new InputStriper(pool, expectedPoolSize);
+ checkSplitEq(fs, striper2.splitFor(pool, expectedPoolSize, 0),
+ expectedPoolSize);
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFileQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFileQueue.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFileQueue.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestFileQueue.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class TestFileQueue {
+
+ static final Log LOG = LogFactory.getLog(TestFileQueue.class);
+ static final int NFILES = 4;
+ static final int BLOCK = 256;
+ static final Path[] paths = new Path[NFILES];
+ static final String[] loc = new String[NFILES];
+ static final long[] start = new long[NFILES];
+ static final long[] len = new long[NFILES];
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ final Configuration conf = new Configuration();
+ final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+ final Path p = new Path(System.getProperty("test.build.data", "/tmp"),
+ "testFileQueue").makeQualified(fs);
+ fs.delete(p, true);
+ final byte[] b = new byte[BLOCK];
+ for (int i = 0; i < NFILES; ++i) {
+ Arrays.fill(b, (byte)('A' + i));
+ paths[i] = new Path(p, "" + (char)('A' + i));
+ OutputStream f = null;
+ try {
+ f = fs.create(paths[i]);
+ f.write(b);
+ } finally {
+ if (f != null) {
+ f.close();
+ }
+ }
+ }
+ }
+
+ @AfterClass
+ public static void cleanup() throws IOException {
+ final Configuration conf = new Configuration();
+ final FileSystem fs = FileSystem.getLocal(conf).getRaw();
+ final Path p = new Path(System.getProperty("test.build.data", "/tmp"),
+ "testFileQueue").makeQualified(fs);
+ fs.delete(p, true);
+ }
+
+ static ByteArrayOutputStream fillVerif() throws IOException {
+ final byte[] b = new byte[BLOCK];
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ for (int i = 0; i < NFILES; ++i) {
+ Arrays.fill(b, (byte)('A' + i));
+ out.write(b, 0, (int)len[i]);
+ }
+ return out;
+ }
+
+ @Test
+ public void testRepeat() throws Exception {
+ final Configuration conf = new Configuration();
+ Arrays.fill(loc, "");
+ Arrays.fill(start, 0L);
+ Arrays.fill(len, BLOCK);
+
+ final ByteArrayOutputStream out = fillVerif();
+ final FileQueue q =
+ new FileQueue(new CombineFileSplit(paths, start, len, loc), conf);
+ final byte[] verif = out.toByteArray();
+ final byte[] check = new byte[2 * NFILES * BLOCK];
+ q.read(check, 0, NFILES * BLOCK);
+ assertArrayEquals(verif, Arrays.copyOf(check, NFILES * BLOCK));
+
+ final byte[] verif2 = new byte[2 * NFILES * BLOCK];
+ System.arraycopy(verif, 0, verif2, 0, verif.length);
+ System.arraycopy(verif, 0, verif2, verif.length, verif.length);
+ q.read(check, 0, 2 * NFILES * BLOCK);
+ assertArrayEquals(verif2, check);
+
+ }
+
+ @Test
+ public void testUneven() throws Exception {
+ final Configuration conf = new Configuration();
+ Arrays.fill(loc, "");
+ Arrays.fill(start, 0L);
+ Arrays.fill(len, BLOCK);
+
+ final int B2 = BLOCK / 2;
+ for (int i = 0; i < NFILES; i += 2) {
+ start[i] += B2;
+ len[i] -= B2;
+ }
+ final FileQueue q =
+ new FileQueue(new CombineFileSplit(paths, start, len, loc), conf);
+ final ByteArrayOutputStream out = fillVerif();
+ final byte[] verif = out.toByteArray();
+ final byte[] check = new byte[NFILES / 2 * BLOCK + NFILES / 2 * B2];
+ q.read(check, 0, verif.length);
+ assertArrayEquals(verif, Arrays.copyOf(check, verif.length));
+ q.read(check, 0, verif.length);
+ assertArrayEquals(verif, Arrays.copyOf(check, verif.length));
+ }
+
+ @Test
+ public void testEmpty() throws Exception {
+ final Configuration conf = new Configuration();
+ // verify OK if unused
+ final FileQueue q = new FileQueue(new CombineFileSplit(
+ new Path[0], new long[0], new long[0], new String[0]), conf);
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+
+public class TestGridmixRecord {
+ private static final Log LOG = LogFactory.getLog(TestGridmixRecord.class);
+
+ static void lengthTest(GridmixRecord x, GridmixRecord y, int min,
+ int max) throws Exception {
+ final Random r = new Random();
+ final long seed = r.nextLong();
+ r.setSeed(seed);
+ LOG.info("length: " + seed);
+ final DataInputBuffer in = new DataInputBuffer();
+ final DataOutputBuffer out1 = new DataOutputBuffer();
+ final DataOutputBuffer out2 = new DataOutputBuffer();
+ for (int i = min; i < max; ++i) {
+ setSerialize(x, r.nextLong(), i, out1);
+ // check write
+ assertEquals(i, out1.getLength());
+ // write to stream
+ x.write(out2);
+ // check read
+ in.reset(out1.getData(), 0, out1.getLength());
+ y.readFields(in);
+ assertEquals(i, x.getSize());
+ assertEquals(i, y.getSize());
+ }
+ // check stream read
+ in.reset(out2.getData(), 0, out2.getLength());
+ for (int i = min; i < max; ++i) {
+ y.readFields(in);
+ assertEquals(i, y.getSize());
+ }
+ }
+
+ static void randomReplayTest(GridmixRecord x, GridmixRecord y, int min,
+ int max) throws Exception {
+ final Random r = new Random();
+ final long seed = r.nextLong();
+ r.setSeed(seed);
+ LOG.info("randReplay: " + seed);
+ final DataOutputBuffer out1 = new DataOutputBuffer();
+ for (int i = min; i < max; ++i) {
+ final int s = out1.getLength();
+ x.setSeed(r.nextLong());
+ x.setSize(i);
+ x.write(out1);
+ assertEquals(i, out1.getLength() - s);
+ }
+ final DataInputBuffer in = new DataInputBuffer();
+ in.reset(out1.getData(), 0, out1.getLength());
+ final DataOutputBuffer out2 = new DataOutputBuffer();
+ // deserialize written records, write to separate buffer
+ for (int i = min; i < max; ++i) {
+ final int s = in.getPosition();
+ y.readFields(in);
+ assertEquals(i, in.getPosition() - s);
+ y.write(out2);
+ }
+ // verify written contents match
+ assertEquals(out1.getLength(), out2.getLength());
+ // assumes that writes will grow buffer deterministically
+ assertEquals("Bad test", out1.getData().length, out2.getData().length);
+ assertArrayEquals(out1.getData(), out2.getData());
+ }
+
+ static void eqSeedTest(GridmixRecord x, GridmixRecord y, int max)
+ throws Exception {
+ final Random r = new Random();
+ final long s = r.nextLong();
+ r.setSeed(s);
+ LOG.info("eqSeed: " + s);
+ assertEquals(x.fixedBytes(), y.fixedBytes());
+ final int min = x.fixedBytes() + 1;
+ final DataOutputBuffer out1 = new DataOutputBuffer();
+ final DataOutputBuffer out2 = new DataOutputBuffer();
+ for (int i = min; i < max; ++i) {
+ final long seed = r.nextLong();
+ setSerialize(x, seed, i, out1);
+ setSerialize(y, seed, i, out2);
+ assertEquals(x, y);
+ assertEquals(x.hashCode(), y.hashCode());
+
+ // verify written contents match
+ assertEquals(out1.getLength(), out2.getLength());
+ // assumes that writes will grow buffer deterministically
+ assertEquals("Bad test", out1.getData().length, out2.getData().length);
+ assertArrayEquals(out1.getData(), out2.getData());
+ }
+ }
+
+ static void binSortTest(GridmixRecord x, GridmixRecord y, int min,
+ int max, WritableComparator cmp) throws Exception {
+ final Random r = new Random();
+ final long s = r.nextLong();
+ r.setSeed(s);
+ LOG.info("sort: " + s);
+ final DataOutputBuffer out1 = new DataOutputBuffer();
+ final DataOutputBuffer out2 = new DataOutputBuffer();
+ for (int i = min; i < max; ++i) {
+ final long seed1 = r.nextLong();
+ setSerialize(x, seed1, i, out1);
+ assertEquals(0, x.compareSeed(seed1, Math.max(0, i - x.fixedBytes())));
+
+ final long seed2 = r.nextLong();
+ setSerialize(y, seed2, i, out2);
+ assertEquals(0, y.compareSeed(seed2, Math.max(0, i - x.fixedBytes())));
+
+ // for eq sized records, ensure byte cmp where req
+ final int chk = WritableComparator.compareBytes(
+ out1.getData(), 0, out1.getLength(),
+ out2.getData(), 0, out2.getLength());
+ assertEquals(chk, x.compareTo(y));
+ assertEquals(chk, cmp.compare(
+ out1.getData(), 0, out1.getLength(),
+ out2.getData(), 0, out2.getLength()));
+ // write second copy, compare eq
+ final int s1 = out1.getLength();
+ x.write(out1);
+ assertEquals(0, cmp.compare(out1.getData(), 0, s1,
+ out1.getData(), s1, out1.getLength() - s1));
+ final int s2 = out2.getLength();
+ y.write(out2);
+ assertEquals(0, cmp.compare(out2.getData(), 0, s2,
+ out2.getData(), s2, out2.getLength() - s2));
+ assertEquals(chk, cmp.compare(out1.getData(), 0, s1,
+ out2.getData(), s2, out2.getLength() - s2));
+ }
+ }
+
+ static void checkSpec(GridmixKey a, GridmixKey b) throws Exception {
+ final Random r = new Random();
+ final long s = r.nextLong();
+ r.setSeed(s);
+ LOG.info("spec: " + s);
+ final DataInputBuffer in = new DataInputBuffer();
+ final DataOutputBuffer out = new DataOutputBuffer();
+ a.setType(GridmixKey.REDUCE_SPEC);
+ b.setType(GridmixKey.REDUCE_SPEC);
+ for (int i = 0; i < 100; ++i) {
+ final int in_rec = r.nextInt(Integer.MAX_VALUE);
+ a.setReduceInputRecords(in_rec);
+ final int out_rec = r.nextInt(Integer.MAX_VALUE);
+ a.setReduceOutputRecords(out_rec);
+ final int out_bytes = r.nextInt(Integer.MAX_VALUE);
+ a.setReduceOutputBytes(out_bytes);
+ final int min = WritableUtils.getVIntSize(in_rec)
+ + WritableUtils.getVIntSize(out_rec)
+ + WritableUtils.getVIntSize(out_bytes);
+ assertEquals(min + 2, a.fixedBytes()); // meta + vint min
+ final int size = r.nextInt(1024) + a.fixedBytes() + 1;
+ setSerialize(a, r.nextLong(), size, out);
+ assertEquals(size, out.getLength());
+ assertTrue(a.equals(a));
+ assertEquals(0, a.compareTo(a));
+
+ in.reset(out.getData(), 0, out.getLength());
+
+ b.readFields(in);
+ assertEquals(size, b.getSize());
+ assertEquals(in_rec, b.getReduceInputRecords());
+ assertEquals(out_rec, b.getReduceOutputRecords());
+ assertEquals(out_bytes, b.getReduceOutputBytes());
+ assertTrue(a.equals(b));
+ assertEquals(0, a.compareTo(b));
+ assertEquals(a.hashCode(), b.hashCode());
+ }
+ }
+
+ static void setSerialize(GridmixRecord x, long seed, int size,
+ DataOutputBuffer out) throws IOException {
+ x.setSeed(seed);
+ x.setSize(size);
+ out.reset();
+ x.write(out);
+ }
+
+ @Test
+ public void testKeySpec() throws Exception {
+ final int min = 5;
+ final int max = 300;
+ final GridmixKey a = new GridmixKey(GridmixKey.REDUCE_SPEC, 1, 0L);
+ final GridmixKey b = new GridmixKey(GridmixKey.REDUCE_SPEC, 1, 0L);
+ lengthTest(a, b, min, max);
+ randomReplayTest(a, b, min, max);
+ binSortTest(a, b, min, max, new GridmixKey.Comparator());
+ // 2 fixed GR bytes, 1 type, 3 spec
+ eqSeedTest(a, b, max);
+ checkSpec(a, b);
+ }
+
+ @Test
+ public void testKeyData() throws Exception {
+ final int min = 2;
+ final int max = 300;
+ final GridmixKey a = new GridmixKey(GridmixKey.DATA, 1, 0L);
+ final GridmixKey b = new GridmixKey(GridmixKey.DATA, 1, 0L);
+ lengthTest(a, b, min, max);
+ randomReplayTest(a, b, min, max);
+ binSortTest(a, b, min, max, new GridmixKey.Comparator());
+ // 2 fixed GR bytes, 1 type
+ eqSeedTest(a, b, 300);
+ }
+
+ @Test
+ public void testBaseRecord() throws Exception {
+ final int min = 1;
+ final int max = 300;
+ final GridmixRecord a = new GridmixRecord();
+ final GridmixRecord b = new GridmixRecord();
+ lengthTest(a, b, min, max);
+ randomReplayTest(a, b, min, max);
+ binSortTest(a, b, min, max, new GridmixRecord.Comparator());
+ // 2 fixed GR bytes
+ eqSeedTest(a, b, 300);
+ }
+
+ public static void main(String[] argv) throws Exception {
+ boolean fail = false;
+ final TestGridmixRecord test = new TestGridmixRecord();
+ try { test.testKeySpec(); } catch (Exception e) {
+ fail = true;
+ e.printStackTrace();
+ }
+ try {test.testKeyData(); } catch (Exception e) {
+ fail = true;
+ e.printStackTrace();
+ }
+ try {test.testBaseRecord(); } catch (Exception e) {
+ fail = true;
+ e.printStackTrace();
+ }
+ System.exit(fail ? -1 : 0);
+ }
+
+ static void printDebug(GridmixRecord a, GridmixRecord b) throws IOException {
+ DataOutputBuffer out = new DataOutputBuffer();
+ a.write(out);
+ System.out.println("A " +
+ Arrays.toString(Arrays.copyOf(out.getData(), out.getLength())));
+ out.reset();
+ b.write(out);
+ System.out.println("B " +
+ Arrays.toString(Arrays.copyOf(out.getData(), out.getLength())));
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.TaskInfo;
+import org.apache.hadoop.util.ToolRunner;
+import static org.apache.hadoop.mapred.Task.Counter.*;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
+
+public class TestGridmixSubmission {
+ {
+ ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.mapred.gridmix")
+ ).getLogger().setLevel(Level.DEBUG);
+ }
+
+ private static FileSystem dfs = null;
+ private static MiniDFSCluster dfsCluster = null;
+ private static MiniMRCluster mrCluster = null;
+
+ private static final int NJOBS = 2;
+ private static final long GENDATA = 50; // in megabytes
+ private static final int GENSLOP = 100 * 1024; // +/- 100k for logs
+
+ @BeforeClass
+ public static void initCluster() throws IOException {
+ Configuration conf = new Configuration();
+ dfsCluster = new MiniDFSCluster(conf, 3, true, null);
+ dfs = dfsCluster.getFileSystem();
+ mrCluster = new MiniMRCluster(3, dfs.getUri().toString(), 1, null, null,
+ new JobConf(conf));
+ }
+
+ @AfterClass
+ public static void shutdownCluster() throws IOException {
+ if (mrCluster != null) {
+ mrCluster.shutdown();
+ }
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ }
+ }
+
+ static class TestMonitor extends JobMonitor {
+
+ static final long SLOPBYTES = 1024;
+ private final int expected;
+ private final BlockingQueue<Job> retiredJobs;
+
+ public TestMonitor(int expected) {
+ super();
+ this.expected = expected;
+ retiredJobs = new LinkedBlockingQueue<Job>();
+ }
+
+ public void verify(ArrayList<JobStory> submitted) throws Exception {
+ final ArrayList<Job> succeeded = new ArrayList<Job>();
+ assertEquals("Bad job count", expected, retiredJobs.drainTo(succeeded));
+ final HashMap<String,JobStory> sub = new HashMap<String,JobStory>();
+ for (JobStory spec : submitted) {
+ sub.put(spec.getName(), spec);
+ }
+ final JobClient client = new JobClient(mrCluster.createJobConf());
+ for (Job job : succeeded) {
+ final String jobname = job.getJobName();
+ if ("GRIDMIX_GENDATA".equals(jobname)) {
+ final Path in = new Path("foo").makeQualified(dfs);
+ final Path out = new Path("/gridmix").makeQualified(dfs);
+ final ContentSummary generated = dfs.getContentSummary(in);
+ assertTrue("Mismatched data gen", // +/- 100k for logs
+ (GENDATA << 20) < generated.getLength() + GENSLOP ||
+ (GENDATA << 20) > generated.getLength() - GENSLOP);
+ FileStatus[] outstat = dfs.listStatus(out);
+ assertEquals("Mismatched job count", NJOBS, outstat.length);
+ continue;
+ }
+ final JobStory spec =
+ sub.get(job.getJobName().replace("GRIDMIX", "MOCKJOB"));
+ assertNotNull("No spec for " + job.getJobName(), spec);
+ assertNotNull("No counters for " + job.getJobName(), job.getCounters());
+
+ final int nMaps = spec.getNumberMaps();
+ final int nReds = spec.getNumberReduces();
+
+ // TODO Blocked by MAPREDUCE-118
+ if (true) return;
+ // TODO
+ System.out.println(jobname + ": " + nMaps + "/" + nReds);
+ final TaskReport[] mReports =
+ client.getMapTaskReports(JobID.downgrade(job.getJobID()));
+ assertEquals("Mismatched map count", nMaps, mReports.length);
+ check(TaskType.MAP, job, spec, mReports,
+ 0, 0, SLOPBYTES, nReds);
+
+ final TaskReport[] rReports =
+ client.getReduceTaskReports(JobID.downgrade(job.getJobID()));
+ assertEquals("Mismatched reduce count", nReds, rReports.length);
+ check(TaskType.REDUCE, job, spec, rReports,
+ nMaps * SLOPBYTES, 2 * nMaps, 0, 0);
+ }
+ }
+
+ public void check(final TaskType type, Job job, JobStory spec,
+ final TaskReport[] runTasks,
+ long extraInputBytes, int extraInputRecords,
+ long extraOutputBytes, int extraOutputRecords) throws Exception {
+
+ long[] runInputRecords = new long[runTasks.length];
+ long[] runInputBytes = new long[runTasks.length];
+ long[] runOutputRecords = new long[runTasks.length];
+ long[] runOutputBytes = new long[runTasks.length];
+ long[] specInputRecords = new long[runTasks.length];
+ long[] specInputBytes = new long[runTasks.length];
+ long[] specOutputRecords = new long[runTasks.length];
+ long[] specOutputBytes = new long[runTasks.length];
+
+ for (int i = 0; i < runTasks.length; ++i) {
+ final TaskInfo specInfo;
+ final Counters counters = runTasks[i].getCounters();
+ switch (type) {
+ case MAP:
+ runInputBytes[i] = counters.findCounter("FileSystemCounters",
+ "HDFS_BYTES_READ").getValue();
+ runInputRecords[i] =
+ (int)counters.findCounter(MAP_INPUT_RECORDS).getValue();
+ runOutputBytes[i] =
+ counters.findCounter(MAP_OUTPUT_BYTES).getValue();
+ runOutputRecords[i] =
+ (int)counters.findCounter(MAP_OUTPUT_RECORDS).getValue();
+
+ specInfo = spec.getTaskInfo(TaskType.MAP, i);
+ specInputRecords[i] = specInfo.getInputRecords();
+ specInputBytes[i] = specInfo.getInputBytes();
+ specOutputRecords[i] = specInfo.getOutputRecords();
+ specOutputBytes[i] = specInfo.getOutputBytes();
+ System.out.printf(type + " SPEC: %9d -> %9d :: %5d -> %5d\n",
+ specInputBytes[i], specOutputBytes[i],
+ specInputRecords[i], specOutputRecords[i]);
+ System.out.printf(type + " RUN: %9d -> %9d :: %5d -> %5d\n",
+ runInputBytes[i], runOutputBytes[i],
+ runInputRecords[i], runOutputRecords[i]);
+ break;
+ case REDUCE:
+ runInputBytes[i] = 0;
+ runInputRecords[i] =
+ (int)counters.findCounter(REDUCE_INPUT_RECORDS).getValue();
+ runOutputBytes[i] =
+ counters.findCounter("FileSystemCounters",
+ "HDFS_BYTES_WRITTEN").getValue();
+ runOutputRecords[i] =
+ (int)counters.findCounter(REDUCE_OUTPUT_RECORDS).getValue();
+
+
+ specInfo = spec.getTaskInfo(TaskType.REDUCE, i);
+ // There is no reliable counter for reduce input bytes. The
+ // variable-length encoding of intermediate records and other noise
+ // make this quantity difficult to estimate. The shuffle and spec
+ // input bytes are included in debug output for reference, but are
+ // not checked
+ specInputBytes[i] = 0;
+ specInputRecords[i] = specInfo.getInputRecords();
+ specOutputRecords[i] = specInfo.getOutputRecords();
+ specOutputBytes[i] = specInfo.getOutputBytes();
+ System.out.printf(type + " SPEC: (%9d) -> %9d :: %5d -> %5d\n",
+ specInfo.getInputBytes(), specOutputBytes[i],
+ specInputRecords[i], specOutputRecords[i]);
+ System.out.printf(type + " RUN: (%9d) -> %9d :: %5d -> %5d\n",
+ counters.findCounter(REDUCE_SHUFFLE_BYTES).getValue(),
+ runOutputBytes[i], runInputRecords[i], runOutputRecords[i]);
+ break;
+ default:
+ specInfo = null;
+ fail("Unexpected type: " + type);
+ }
+ }
+
+ // Check input bytes
+ Arrays.sort(specInputBytes);
+ Arrays.sort(runInputBytes);
+ for (int i = 0; i < runTasks.length; ++i) {
+ assertTrue("Mismatched " + type + " input bytes " +
+ specInputBytes[i] + "/" + runInputBytes[i],
+ eqPlusMinus(runInputBytes[i], specInputBytes[i], extraInputBytes));
+ }
+
+ // Check input records
+ Arrays.sort(specInputRecords);
+ Arrays.sort(runInputRecords);
+ for (int i = 0; i < runTasks.length; ++i) {
+ assertTrue("Mismatched " + type + " input records " +
+ specInputRecords[i] + "/" + runInputRecords[i],
+ eqPlusMinus(runInputRecords[i], specInputRecords[i],
+ extraInputRecords));
+ }
+
+ // Check output bytes
+ Arrays.sort(specOutputBytes);
+ Arrays.sort(runOutputBytes);
+ for (int i = 0; i < runTasks.length; ++i) {
+ assertTrue("Mismatched " + type + " output bytes " +
+ specOutputBytes[i] + "/" + runOutputBytes[i],
+ eqPlusMinus(runOutputBytes[i], specOutputBytes[i],
+ extraOutputBytes));
+ }
+
+ // Check output records
+ Arrays.sort(specOutputRecords);
+ Arrays.sort(runOutputRecords);
+ for (int i = 0; i < runTasks.length; ++i) {
+ assertTrue("Mismatched " + type + " output records " +
+ specOutputRecords[i] + "/" + runOutputRecords[i],
+ eqPlusMinus(runOutputRecords[i], specOutputRecords[i],
+ extraOutputRecords));
+ }
+
+ }
+
+ private static boolean eqPlusMinus(long a, long b, long x) {
+ final long diff = Math.abs(a - b);
+ return diff <= x;
+ }
+
+ @Override
+ protected void onSuccess(Job job) {
+ retiredJobs.add(job);
+ }
+ @Override
+ protected void onFailure(Job job) {
+ fail("Job failure: " + job);
+ }
+ }
+
+ static class DebugGridmix extends Gridmix {
+
+ private DebugJobFactory factory;
+ private TestMonitor monitor;
+
+ public void checkMonitor() throws Exception {
+ monitor.verify(factory.getSubmitted());
+ }
+
+ @Override
+ protected JobMonitor createJobMonitor() {
+ monitor = new TestMonitor(NJOBS + 1); // include data generation job
+ return monitor;
+ }
+
+ @Override
+ protected JobFactory createJobFactory(JobSubmitter submitter,
+ String traceIn, Path scratchDir, Configuration conf,
+ CountDownLatch startFlag) throws IOException {
+ factory =
+ new DebugJobFactory(submitter, scratchDir, NJOBS, conf, startFlag);
+ return factory;
+ }
+ }
+
+ @Test
+ public void testSubmit() throws Exception {
+ final Path in = new Path("foo").makeQualified(dfs);
+ final Path out = new Path("/gridmix").makeQualified(dfs);
+ final String[] argv = {
+ "-D" + FilePool.GRIDMIX_MIN_FILE + "=0",
+ "-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out,
+ "-generate", String.valueOf(GENDATA) + "m",
+ in.toString(),
+ "-" // ignored by DebugGridmix
+ };
+ DebugGridmix client = new DebugGridmix();
+ final Configuration conf = mrCluster.createJobConf();
+ //conf.setInt(Gridmix.GRIDMIX_KEY_LEN, 2);
+ int res = ToolRunner.run(conf, client, argv);
+ assertEquals("Client exited with nonzero status", 0, res);
+ client.checkMonitor();
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRecordFactory.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,79 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.util.Random;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+
+public class TestRecordFactory {
+ private static final Log LOG = LogFactory.getLog(TestRecordFactory.class);
+
+ public static void testFactory(long targetBytes, long targetRecs)
+ throws Exception {
+ final Configuration conf = new Configuration();
+ final GridmixKey key = new GridmixKey();
+ final GridmixRecord val = new GridmixRecord();
+ LOG.info("Target bytes/records: " + targetBytes + "/" + targetRecs);
+ final RecordFactory f = new AvgRecordFactory(targetBytes, targetRecs, conf);
+ targetRecs = targetRecs <= 0 && targetBytes >= 0
+ ? Math.max(1,
+ targetBytes / conf.getInt("gridmix.missing.rec.size", 64 * 1024))
+ : targetRecs;
+
+ long records = 0L;
+ final DataOutputBuffer out = new DataOutputBuffer();
+ while (f.next(key, val)) {
+ ++records;
+ key.write(out);
+ val.write(out);
+ }
+ assertEquals(targetRecs, records);
+ assertEquals(targetBytes, out.getLength());
+ }
+
+ @Test
+ public void testRandom() throws Exception {
+ final Random r = new Random();
+ final long targetBytes = r.nextInt(1 << 20) + 3 * (1 << 14);
+ final long targetRecs = r.nextInt(1 << 14);
+ testFactory(targetBytes, targetRecs);
+ }
+
+ @Test
+ public void testAvg() throws Exception {
+ final Random r = new Random();
+ final long avgsize = r.nextInt(1 << 10) + 1;
+ final long targetRecs = r.nextInt(1 << 14);
+ testFactory(targetRecs * avgsize, targetRecs);
+ }
+
+ @Test
+ public void testZero() throws Exception {
+ final Random r = new Random();
+ final long targetBytes = r.nextInt(1 << 20);
+ testFactory(targetBytes, 0);
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1077079&r1=1077078&r2=1077079&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java Fri Mar 4 03:38:20 2011
@@ -63,7 +63,7 @@ abstract public class Task implements Wr
LogFactory.getLog(Task.class);
// Counters used by Task subclasses
- protected static enum Counter {
+ public static enum Counter {
MAP_INPUT_RECORDS,
MAP_OUTPUT_RECORDS,
MAP_SKIPPED_RECORDS,
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=1077079&r1=1077078&r2=1077079&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Fri Mar 4 03:38:20 2011
@@ -33,7 +33,7 @@ import org.apache.hadoop.util.StringUtil
* not intended to be a comprehensive piece of data.
*
**************************************************/
-abstract class TaskStatus implements Writable, Cloneable {
+public abstract class TaskStatus implements Writable, Cloneable {
static final Log LOG =
LogFactory.getLog(TaskStatus.class.getName());
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/HistogramRawTestData.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/HistogramRawTestData.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/HistogramRawTestData.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/HistogramRawTestData.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.List;
+
+class HistogramRawTestData {
+ List<Long> data = new ArrayList<Long>();
+
+ List<Integer> percentiles = new ArrayList<Integer>();
+
+ int scale;
+
+ public List<Integer> getPercentiles() {
+ return percentiles;
+ }
+
+ public void setPercentiles(List<Integer> percentiles) {
+ this.percentiles = percentiles;
+ }
+
+ public int getScale() {
+ return scale;
+ }
+
+ public void setScale(int scale) {
+ this.scale = scale;
+ }
+
+ public List<Long> getData() {
+ return data;
+ }
+
+ public void setData(List<Long> data) {
+ this.data = data;
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestHistograms.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestHistograms.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestHistograms.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestHistograms.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+import java.io.IOException;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.codehaus.jackson.JsonEncoding;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestHistograms {
+
+ /**
+ * @throws IOException
+ *
+ * There should be files in the directory named by
+ * ${test.build.data}/rumen/histogram-test .
+ *
+ * There will be pairs of files, inputXxx.json and goldXxx.json .
+ *
+ * We read the input file as a HistogramRawTestData in json. Then we
+ * create a Histogram using the data field, and then a
+ * LoggedDiscreteCDF using the percentiles and scale field. Finally,
+ * we read the corresponding goldXxx.json as a LoggedDiscreteCDF and
+ * deepCompare them.
+ */
+ @Test
+ public void testHistograms() throws IOException {
+ final Configuration conf = new Configuration();
+ final FileSystem lfs = FileSystem.getLocal(conf);
+ final Path rootInputDir = new Path(
+ System.getProperty("test.tools.input.dir", "")).makeQualified(lfs);
+ final Path rootInputFile = new Path(rootInputDir, "rumen/histogram-tests");
+
+
+ FileStatus[] tests = lfs.listStatus(rootInputFile);
+
+ for (int i = 0; i < tests.length; ++i) {
+ Path filePath = tests[i].getPath();
+ String fileName = filePath.getName();
+ if (fileName.startsWith("input")) {
+ String testName = fileName.substring("input".length());
+ Path goldFilePath = new Path(rootInputFile, "gold"+testName);
+ assertTrue("Gold file dies not exist", lfs.exists(goldFilePath));
+ LoggedDiscreteCDF newResult = histogramFileToCDF(filePath, lfs);
+ System.out.println("Testing a Histogram for " + fileName);
+ FSDataInputStream goldStream = lfs.open(goldFilePath);
+ JsonObjectMapperParser<LoggedDiscreteCDF> parser = new JsonObjectMapperParser<LoggedDiscreteCDF>(
+ goldStream, LoggedDiscreteCDF.class);
+ try {
+ LoggedDiscreteCDF dcdf = parser.getNext();
+ dcdf.deepCompare(newResult, new TreePath(null, "<root>"));
+ } catch (DeepInequalityException e) {
+ fail(e.path.toString());
+ }
+ finally {
+ parser.close();
+ }
+ }
+ }
+ }
+
+ private static LoggedDiscreteCDF histogramFileToCDF(Path path, FileSystem fs)
+ throws IOException {
+ FSDataInputStream dataStream = fs.open(path);
+ JsonObjectMapperParser<HistogramRawTestData> parser = new JsonObjectMapperParser<HistogramRawTestData>(
+ dataStream, HistogramRawTestData.class);
+ HistogramRawTestData data;
+ try {
+ data = parser.getNext();
+ } finally {
+ parser.close();
+ }
+
+ Histogram hist = new Histogram();
+ List<Long> measurements = data.getData();
+ List<Long> typeProbeData = new HistogramRawTestData().getData();
+
+ assertTrue(
+ "The data attribute of a jackson-reconstructed HistogramRawTestData "
+ + " should be a " + typeProbeData.getClass().getName()
+ + ", like a virgin HistogramRawTestData, but it's a "
+ + measurements.getClass().getName(),
+ measurements.getClass() == typeProbeData.getClass());
+
+ for (int j = 0; j < measurements.size(); ++j) {
+ hist.enter(measurements.get(j));
+ }
+
+ LoggedDiscreteCDF result = new LoggedDiscreteCDF();
+ int[] percentiles = new int[data.getPercentiles().size()];
+
+ for (int j = 0; j < data.getPercentiles().size(); ++j) {
+ percentiles[j] = data.getPercentiles().get(j);
+ }
+
+ result.setCDF(hist, percentiles, data.getScale());
+ return result;
+ }
+
+ public static void main(String[] args) throws IOException {
+ final Configuration conf = new Configuration();
+ final FileSystem lfs = FileSystem.getLocal(conf);
+
+ for (String arg : args) {
+ Path filePath = new Path(arg).makeQualified(lfs);
+ String fileName = filePath.getName();
+ if (fileName.startsWith("input")) {
+ LoggedDiscreteCDF newResult = histogramFileToCDF(filePath, lfs);
+ String testName = fileName.substring("input".length());
+ Path goldFilePath = new Path(filePath.getParent(), "gold"+testName);
+
+ ObjectMapper mapper = new ObjectMapper();
+ JsonFactory factory = mapper.getJsonFactory();
+ FSDataOutputStream ostream = lfs.create(goldFilePath, true);
+ JsonGenerator gen = factory.createJsonGenerator(ostream,
+ JsonEncoding.UTF8);
+ gen.useDefaultPrettyPrinter();
+
+ gen.writeObject(newResult);
+
+ gen.close();
+ } else {
+ System.err.println("Input file not started with \"input\". File "+fileName+" skipped.");
+ }
+ }
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestPiecewiseLinearInterpolation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestPiecewiseLinearInterpolation.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestPiecewiseLinearInterpolation.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestPiecewiseLinearInterpolation.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestPiecewiseLinearInterpolation {
+
+ static private double maximumRelativeError = 0.002D;
+
+ static private LoggedSingleRelativeRanking makeRR(double ranking, long datum) {
+ LoggedSingleRelativeRanking result = new LoggedSingleRelativeRanking();
+
+ result.setDatum(datum);
+ result.setRelativeRanking(ranking);
+
+ return result;
+ }
+
+ @Test
+ public void testOneRun() {
+ LoggedDiscreteCDF input = new LoggedDiscreteCDF();
+
+ input.setMinimum(100000L);
+ input.setMaximum(1100000L);
+
+ ArrayList<LoggedSingleRelativeRanking> rankings = new ArrayList<LoggedSingleRelativeRanking>();
+
+ rankings.add(makeRR(0.1, 200000L));
+ rankings.add(makeRR(0.5, 800000L));
+ rankings.add(makeRR(0.9, 1000000L));
+
+ input.setRankings(rankings);
+ input.setNumberValues(3);
+
+ CDFRandomGenerator gen = new CDFPiecewiseLinearRandomGenerator(input);
+ Histogram values = new Histogram();
+
+ for (int i = 0; i < 1000000; ++i) {
+ long value = gen.randomValue();
+ values.enter(value);
+ }
+
+ /*
+ * Now we build a percentiles CDF, and compute the sum of the squares of the
+ * actual percentiles vrs. the predicted percentiles
+ */
+ int[] percentiles = new int[99];
+
+ for (int i = 0; i < 99; ++i) {
+ percentiles[i] = i + 1;
+ }
+
+ long[] result = values.getCDF(100, percentiles);
+ long sumErrorSquares = 0L;
+
+ for (int i = 0; i < 10; ++i) {
+ long error = result[i] - (10000L * i + 100000L);
+ System.out.println("element " + i + ", got " + result[i] + ", expected "
+ + (10000L * i + 100000L) + ", error = " + error);
+ sumErrorSquares += error * error;
+ }
+
+ for (int i = 10; i < 50; ++i) {
+ long error = result[i] - (15000L * i + 50000L);
+ System.out.println("element " + i + ", got " + result[i] + ", expected "
+ + (15000L * i + 50000L) + ", error = " + error);
+ sumErrorSquares += error * error;
+ }
+
+ for (int i = 50; i < 90; ++i) {
+ long error = result[i] - (5000L * i + 550000L);
+ System.out.println("element " + i + ", got " + result[i] + ", expected "
+ + (5000L * i + 550000L) + ", error = " + error);
+ sumErrorSquares += error * error;
+ }
+
+ for (int i = 90; i <= 100; ++i) {
+ long error = result[i] - (10000L * i + 100000L);
+ System.out.println("element " + i + ", got " + result[i] + ", expected "
+ + (10000L * i + 100000L) + ", error = " + error);
+ sumErrorSquares += error * error;
+ }
+
+ // normalize the error
+ double realSumErrorSquares = (double) sumErrorSquares;
+
+ double normalizedError = realSumErrorSquares / 100
+ / rankings.get(1).getDatum() / rankings.get(1).getDatum();
+ double RMSNormalizedError = Math.sqrt(normalizedError);
+
+ System.out.println("sumErrorSquares = " + sumErrorSquares);
+
+ System.out.println("normalizedError: " + normalizedError
+ + ", RMSNormalizedError: " + RMSNormalizedError);
+
+ System.out.println("Cumulative error is " + RMSNormalizedError);
+
+ assertTrue("The RMS relative error per bucket, " + RMSNormalizedError
+ + ", exceeds our tolerance of " + maximumRelativeError,
+ RMSNormalizedError <= maximumRelativeError);
+
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestRumenJobTraces {
+ @Test
+ public void testSmallTrace() throws Exception {
+ performSingleTest("sample-job-tracker-logs.gz",
+ "job-tracker-logs-topology-output", "job-tracker-logs-trace-output.gz");
+ }
+
+ @Test
+ public void testTruncatedTask() throws Exception {
+ performSingleTest("truncated-job-tracker-log", "truncated-topology-output",
+ "truncated-trace-output");
+ }
+
+ private void performSingleTest(String jtLogName, String goldTopology,
+ String goldTrace) throws Exception {
+ final Configuration conf = new Configuration();
+ final FileSystem lfs = FileSystem.getLocal(conf);
+
+ final Path rootInputDir =
+ new Path(System.getProperty("test.tools.input.dir", ""))
+ .makeQualified(lfs);
+ final Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp"))
+ .makeQualified(lfs);
+
+ final Path rootInputFile = new Path(rootInputDir, "rumen/small-trace-test");
+ final Path tempDir = new Path(rootTempDir, "TestRumenJobTraces");
+ lfs.delete(tempDir, true);
+
+ final Path topologyFile = new Path(tempDir, jtLogName + "-topology.json");
+ final Path traceFile = new Path(tempDir, jtLogName + "-trace.json");
+
+ final Path inputFile = new Path(rootInputFile, jtLogName);
+
+ System.out.println("topology result file = " + topologyFile);
+ System.out.println("trace result file = " + traceFile);
+
+ String[] args = new String[6];
+
+ args[0] = "-v1";
+
+ args[1] = "-write-topology";
+ args[2] = topologyFile.toString();
+
+ args[3] = "-write-job-trace";
+ args[4] = traceFile.toString();
+
+ args[5] = inputFile.toString();
+
+ final Path topologyGoldFile = new Path(rootInputFile, goldTopology);
+ final Path traceGoldFile = new Path(rootInputFile, goldTrace);
+
+ HadoopLogsAnalyzer analyzer = new HadoopLogsAnalyzer();
+ int result = ToolRunner.run(analyzer, args);
+ assertEquals("Non-zero exit", 0, result);
+
+ TestRumenJobTraces
+ .<LoggedNetworkTopology> jsonFileMatchesGold(lfs, topologyFile,
+ topologyGoldFile, LoggedNetworkTopology.class, "topology");
+ TestRumenJobTraces.<LoggedJob> jsonFileMatchesGold(lfs, traceFile,
+ traceGoldFile, LoggedJob.class, "trace");
+ }
+
+ static private <T extends DeepCompare> void jsonFileMatchesGold(
+ FileSystem lfs, Path result, Path gold, Class<? extends T> clazz,
+ String fileDescription) throws IOException {
+ JsonObjectMapperParser<T> goldParser =
+ new JsonObjectMapperParser<T>(gold, clazz, new Configuration());
+ InputStream resultStream = lfs.open(result);
+ JsonObjectMapperParser<T> resultParser =
+ new JsonObjectMapperParser<T>(resultStream, clazz);
+ try {
+ while (true) {
+ DeepCompare goldJob = goldParser.getNext();
+ DeepCompare resultJob = resultParser.getNext();
+ if ((goldJob == null) || (resultJob == null)) {
+ assertTrue(goldJob == resultJob);
+ break;
+ }
+
+ try {
+ resultJob.deepCompare(goldJob, new TreePath(null, "<root>"));
+ } catch (DeepInequalityException e) {
+ String error = e.path.toString();
+
+ assertFalse(fileDescription + " mismatches: " + error, true);
+ }
+ }
+ } finally {
+ IOUtils.cleanup(null, goldParser, resultParser);
+ }
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestZombieJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestZombieJob.java?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestZombieJob.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestZombieJob.java Fri Mar 4 03:38:20 2011
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskStatus.State;
+import org.apache.hadoop.mapreduce.TaskType;
+
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestZombieJob {
+ final double epsilon = 0.01;
+ private final int[] attemptTimesPercentiles = new int[] { 10, 50, 90 };
+ private long[] succeededCDF = new long[] { 5268, 5268, 5268, 5268, 5268 };
+ private long[] failedCDF = new long[] { 18592, 18592, 18592, 18592, 18592 };
+ private double[] expectedPs = new double[] { 0.000001, 0.18707660239708182,
+ 0.0013027618551328818, 2.605523710265763E-4 };
+
+ private final long[] mapTaskCounts = new long[] { 7838525L, 342277L, 100228L,
+ 1564L, 1234L };
+ private final long[] reduceTaskCounts = new long[] { 4405338L, 139391L,
+ 1514383L, 139391, 1234L };
+
+ List<LoggedJob> loggedJobs = new ArrayList<LoggedJob>();
+ List<JobStory> jobStories = new ArrayList<JobStory>();
+
+ @Before
+ public void setUp() throws Exception {
+ final Configuration conf = new Configuration();
+ final FileSystem lfs = FileSystem.getLocal(conf);
+
+ final Path rootInputDir = new Path(
+ System.getProperty("test.tools.input.dir", "")).makeQualified(lfs);
+ final Path rootInputFile = new Path(rootInputDir, "rumen/zombie");
+
+ ZombieJobProducer parser = new ZombieJobProducer(new Path(rootInputFile,
+ "input-trace.json"), new ZombieCluster(new Path(rootInputFile,
+ "input-topology.json"), null, conf), conf);
+
+ JobStory job = null;
+ for (int i = 0; i < 4; i++) {
+ job = parser.getNextJob();
+ ZombieJob zJob = (ZombieJob) job;
+ LoggedJob loggedJob = zJob.getLoggedJob();
+ System.out.println(i + ":" + job.getNumberMaps() + "m, "
+ + job.getNumberReduces() + "r");
+ System.out
+ .println(loggedJob.getOutcome() + ", " + loggedJob.getJobtype());
+
+ System.out.println("Input Splits -- " + job.getInputSplits().length
+ + ", " + job.getNumberMaps());
+
+ System.out.println("Successful Map CDF -------");
+ for (LoggedDiscreteCDF cdf : loggedJob.getSuccessfulMapAttemptCDFs()) {
+ System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum()
+ + "--" + cdf.getMaximum());
+ for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
+ System.out.println(" " + ranking.getRelativeRanking() + ":"
+ + ranking.getDatum());
+ }
+ }
+ System.out.println("Failed Map CDF -----------");
+ for (LoggedDiscreteCDF cdf : loggedJob.getFailedMapAttemptCDFs()) {
+ System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum()
+ + "--" + cdf.getMaximum());
+ for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
+ System.out.println(" " + ranking.getRelativeRanking() + ":"
+ + ranking.getDatum());
+ }
+ }
+ System.out.println("Successful Reduce CDF ----");
+ LoggedDiscreteCDF cdf = loggedJob.getSuccessfulReduceAttemptCDF();
+ System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--"
+ + cdf.getMaximum());
+ for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
+ System.out.println(" " + ranking.getRelativeRanking() + ":"
+ + ranking.getDatum());
+ }
+ System.out.println("Failed Reduce CDF --------");
+ cdf = loggedJob.getFailedReduceAttemptCDF();
+ System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--"
+ + cdf.getMaximum());
+ for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
+ System.out.println(" " + ranking.getRelativeRanking() + ":"
+ + ranking.getDatum());
+ }
+ System.out.print("map attempts to success -- ");
+ for (double p : loggedJob.getMapperTriesToSucceed()) {
+ System.out.print(p + ", ");
+ }
+ System.out.println();
+ System.out.println("===============");
+
+ loggedJobs.add(loggedJob);
+ jobStories.add(job);
+ }
+ }
+
+ @Test
+ public void testFirstJob() {
+ // 20th job seems reasonable: "totalMaps":329,"totalReduces":101
+ // successful map: 80 node-local, 196 rack-local, 53 rack-remote, 2 unknown
+ // failed map: 0-0-0-1
+ // successful reduce: 99 failed reduce: 13
+ // map attempts to success -- 0.9969879518072289, 0.0030120481927710845,
+ JobStory job = jobStories.get(0);
+ assertEquals(1, job.getNumberMaps());
+ assertEquals(1, job.getNumberReduces());
+
+ // get splits
+
+ TaskAttemptInfo taInfo = null;
+ long expectedRuntime = 2423;
+ // get a succeeded map task attempt, expect the exact same task attempt
+ taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 1);
+ assertEquals(expectedRuntime, taInfo.getRuntime());
+ assertEquals(State.SUCCEEDED, taInfo.getRunState());
+
+ // get a succeeded map attempt, but reschedule with different locality.
+ taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 2);
+ assertEquals(State.SUCCEEDED, taInfo.getRunState());
+ taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 0);
+ assertEquals(State.SUCCEEDED, taInfo.getRunState());
+
+ expectedRuntime = 97502;
+ // get a succeeded reduce task attempt, expect the exact same task attempt
+ taInfo = job.getTaskAttemptInfo(TaskType.REDUCE, 14, 0);
+ assertEquals(State.SUCCEEDED, taInfo.getRunState());
+
+ // get a failed reduce task attempt, expect the exact same task attempt
+ taInfo = job.getTaskAttemptInfo(TaskType.REDUCE, 14, 0);
+ assertEquals(State.SUCCEEDED, taInfo.getRunState());
+
+ // get a non-exist reduce task attempt, expect a made-up task attempt
+ // TODO fill in test case
+ }
+
+ @Test
+ public void testSecondJob() {
+ // 7th job has many failed tasks.
+ // 3204 m, 0 r
+ // successful maps 497-586-23-1, failed maps 0-0-0-2714
+ // map attempts to success -- 0.8113600833767587, 0.18707660239708182,
+ // 0.0013027618551328818, 2.605523710265763E-4,
+ JobStory job = jobStories.get(1);
+ assertEquals(20, job.getNumberMaps());
+ assertEquals(1, job.getNumberReduces());
+
+ TaskAttemptInfo taInfo = null;
+ // get a succeeded map task attempt
+ taInfo = job.getMapTaskAttemptInfoAdjusted(17, 1, 1);
+ assertEquals(State.SUCCEEDED, taInfo.getRunState());
+
+ // get a succeeded map task attempt, with different locality
+ taInfo = job.getMapTaskAttemptInfoAdjusted(17, 1, 2);
+ assertEquals(State.SUCCEEDED, taInfo.getRunState());
+ taInfo = job.getMapTaskAttemptInfoAdjusted(17, 1, 0);
+ assertEquals(State.SUCCEEDED, taInfo.getRunState());
+
+ // get a failed map task attempt
+ taInfo = job.getMapTaskAttemptInfoAdjusted(14, 0, 1);
+ assertEquals(1927, taInfo.getRuntime());
+ assertEquals(State.SUCCEEDED, taInfo.getRunState());
+
+ // get a failed map task attempt, with different locality
+ // TODO: this test does not make sense here, because I don't have
+ // available data set.
+ }
+
+ @Test
+ public void testFourthJob() {
+ // 7th job has many failed tasks.
+ // 3204 m, 0 r
+ // successful maps 497-586-23-1, failed maps 0-0-0-2714
+ // map attempts to success -- 0.8113600833767587, 0.18707660239708182,
+ // 0.0013027618551328818, 2.605523710265763E-4,
+ JobStory job = jobStories.get(3);
+ assertEquals(131, job.getNumberMaps());
+ assertEquals(47, job.getNumberReduces());
+
+ TaskAttemptInfo taInfo = null;
+ // get a succeeded map task attempt
+ long runtime = 5268;
+ taInfo = job.getMapTaskAttemptInfoAdjusted(113, 1, 1);
+ assertEquals(State.SUCCEEDED, taInfo.getRunState());
+ assertEquals(runtime, taInfo.getRuntime());
+
+ // get a succeeded map task attempt, with different locality
+ taInfo = job.getMapTaskAttemptInfoAdjusted(113, 1, 2);
+ assertEquals(State.SUCCEEDED, taInfo.getRunState());
+ assertEquals(runtime, taInfo.getRuntime() / 2);
+ taInfo = job.getMapTaskAttemptInfoAdjusted(113, 1, 0);
+ assertEquals(State.SUCCEEDED, taInfo.getRunState());
+ assertEquals((long) (runtime / 1.5), taInfo.getRuntime());
+
+ // get a failed map task attempt
+ taInfo = job.getMapTaskAttemptInfoAdjusted(113, 0, 1);
+ assertEquals(18592, taInfo.getRuntime());
+ assertEquals(State.FAILED, taInfo.getRunState());
+ }
+
+ @Test
+ public void testRecordIOInfo() {
+ JobStory job = jobStories.get(3);
+
+ TaskInfo mapTask = job.getTaskInfo(TaskType.MAP, 113);
+
+ TaskInfo reduceTask = job.getTaskInfo(TaskType.REDUCE, 0);
+
+ assertEquals(mapTaskCounts[0], mapTask.getInputBytes());
+ assertEquals(mapTaskCounts[1], mapTask.getInputRecords());
+ assertEquals(mapTaskCounts[2], mapTask.getOutputBytes());
+ assertEquals(mapTaskCounts[3], mapTask.getOutputRecords());
+ assertEquals(mapTaskCounts[4], mapTask.getTaskMemory());
+
+ assertEquals(reduceTaskCounts[0], reduceTask.getInputBytes());
+ assertEquals(reduceTaskCounts[1], reduceTask.getInputRecords());
+ assertEquals(reduceTaskCounts[2], reduceTask.getOutputBytes());
+ assertEquals(reduceTaskCounts[3], reduceTask.getOutputRecords());
+ assertEquals(reduceTaskCounts[4], reduceTask.getTaskMemory());
+ }
+
+ @Test
+ public void testMakeUpInfo() {
+ // get many non-exist tasks
+ // total 3204 map tasks, 3300 is a non-exist task.
+ checkMakeUpTask(jobStories.get(3), 113, 1);
+ }
+
+ private void checkMakeUpTask(JobStory job, int taskNumber, int locality) {
+ TaskAttemptInfo taInfo = null;
+
+ Histogram sampleSucceeded = new Histogram();
+ Histogram sampleFailed = new Histogram();
+ List<Integer> sampleAttempts = new ArrayList<Integer>();
+ for (int i = 0; i < 100000; i++) {
+ int attemptId = 0;
+ while (true) {
+ taInfo = job.getMapTaskAttemptInfoAdjusted(taskNumber, attemptId, 1);
+ if (taInfo.getRunState() == State.SUCCEEDED) {
+ sampleSucceeded.enter(taInfo.getRuntime());
+ break;
+ }
+ sampleFailed.enter(taInfo.getRuntime());
+ attemptId++;
+ }
+ sampleAttempts.add(attemptId);
+ }
+
+ // check state distribution
+ int[] countTries = new int[] { 0, 0, 0, 0 };
+ for (int attempts : sampleAttempts) {
+ assertTrue(attempts < 4);
+ countTries[attempts]++;
+ }
+ /*
+ * System.out.print("Generated map attempts to success -- "); for (int
+ * count: countTries) { System.out.print((double)count/sampleAttempts.size()
+ * + ", "); } System.out.println(); System.out.println("===============");
+ */
+ for (int i = 0; i < 4; i++) {
+ int count = countTries[i];
+ double p = (double) count / sampleAttempts.size();
+ assertTrue(expectedPs[i] - p < epsilon);
+ }
+
+ // check succeeded attempts runtime distribution
+ long[] expectedCDF = succeededCDF;
+ LoggedDiscreteCDF cdf = new LoggedDiscreteCDF();
+ cdf.setCDF(sampleSucceeded, attemptTimesPercentiles, 100);
+ /*
+ * System.out.println("generated succeeded map runtime distribution");
+ * System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--"
+ * + cdf.getMaximum()); for (LoggedSingleRelativeRanking ranking:
+ * cdf.getRankings()) { System.out.println(" " +
+ * ranking.getRelativeRanking() + ":" + ranking.getDatum()); }
+ */
+ assertRuntimeEqual(cdf.getMinimum(), expectedCDF[0]);
+ assertRuntimeEqual(cdf.getMaximum(), expectedCDF[4]);
+ for (int i = 0; i < 3; i++) {
+ LoggedSingleRelativeRanking ranking = cdf.getRankings().get(i);
+ assertRuntimeEqual(expectedCDF[i + 1], ranking.getDatum());
+ }
+
+ // check failed attempts runtime distribution
+ expectedCDF = failedCDF;
+ cdf = new LoggedDiscreteCDF();
+ cdf.setCDF(sampleFailed, attemptTimesPercentiles, 100);
+
+ System.out.println("generated failed map runtime distribution");
+ System.out.println(cdf.getNumberValues() + ": " + cdf.getMinimum() + "--"
+ + cdf.getMaximum());
+ for (LoggedSingleRelativeRanking ranking : cdf.getRankings()) {
+ System.out.println(" " + ranking.getRelativeRanking() + ":"
+ + ranking.getDatum());
+ }
+ assertRuntimeEqual(cdf.getMinimum(), expectedCDF[0]);
+ assertRuntimeEqual(cdf.getMaximum(), expectedCDF[4]);
+ for (int i = 0; i < 3; i++) {
+ LoggedSingleRelativeRanking ranking = cdf.getRankings().get(i);
+ assertRuntimeEqual(expectedCDF[i + 1], ranking.getDatum());
+ }
+ }
+
+ private void assertRuntimeEqual(long expected, long generated) {
+ if (expected == 0) {
+ assertTrue(generated > -1000 && generated < 1000);
+ } else {
+ long epsilon = Math.max(expected / 10, 5000);
+ assertTrue(expected - generated > -epsilon);
+ assertTrue(expected - generated < epsilon);
+ }
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-minimal.json
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-minimal.json?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-minimal.json (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-minimal.json Fri Mar 4 03:38:20 2011
@@ -0,0 +1,15 @@
+{
+ "minimum" : 12345,
+ "rankings" : [ {
+ "relativeRanking" : 0.25,
+ "datum" : 12345
+ }, {
+ "relativeRanking" : 0.5,
+ "datum" : 2345678901
+ }, {
+ "relativeRanking" : 0.75,
+ "datum" : 2345678902
+ } ],
+ "maximum" : 23456789012,
+ "numberValues" : 5
+}
\ No newline at end of file
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-one-value-many-repeats.json
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-one-value-many-repeats.json?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-one-value-many-repeats.json (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-one-value-many-repeats.json Fri Mar 4 03:38:20 2011
@@ -0,0 +1,15 @@
+{
+ "minimum" : 23456789012,
+ "rankings" : [ {
+ "relativeRanking" : 0.25,
+ "datum" : 23456789012
+ }, {
+ "relativeRanking" : 0.5,
+ "datum" : 23456789012
+ }, {
+ "relativeRanking" : 0.75,
+ "datum" : 23456789012
+ } ],
+ "maximum" : 23456789012,
+ "numberValues" : 64
+}
\ No newline at end of file
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-only-one-value.json
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-only-one-value.json?rev=1077079&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-only-one-value.json (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/tools/data/rumen/histogram-tests/gold-only-one-value.json Fri Mar 4 03:38:20 2011
@@ -0,0 +1,15 @@
+{
+ "minimum" : 23456789012,
+ "rankings" : [ {
+ "relativeRanking" : 0.25,
+ "datum" : 23456789012
+ }, {
+ "relativeRanking" : 0.5,
+ "datum" : 23456789012
+ }, {
+ "relativeRanking" : 0.75,
+ "datum" : 23456789012
+ } ],
+ "maximum" : 23456789012,
+ "numberValues" : 1
+}
\ No newline at end of file