You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/06/21 08:37:39 UTC
svn commit: r1495297 [39/46] - in /hadoop/common/branches/branch-1-win: ./
bin/ conf/ ivy/ lib/jdiff/ src/c++/libhdfs/docs/
src/c++/libhdfs/tests/conf/ src/contrib/capacity-scheduler/ivy/
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred...
Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTaskTrackerActionCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTaskTrackerActionCleanup.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTaskTrackerActionCleanup.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTaskTrackerActionCleanup.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.TaskTracker.RunningJob;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestTaskTrackerActionCleanup extends TestCase {
+ String jtIdentifier = "201210122331";
+ TestTaskTracker tt = null;
+
+ @Before
+ public void setUp() {
+ tt = new TestTaskTracker();
+ }
+
+ @Test
+ // check duplicate entries do not make it to the queue
+ public void testDuplicateEntries() throws InterruptedException {
+
+ KillJobAction action = new KillJobAction();
+ // add the action twice
+ tt.addActionToCleanup(action);
+ tt.addActionToCleanup(action);
+
+ checkItemCountInQueue(tt, 1, 1, 0);
+ }
+
+ @Test
+ // test to make sure all tasks with localizing jobs are added to another queue
+ public void testLocalizingJobActions() throws InterruptedException,
+ IOException {
+
+ // job and attempt ids
+ JobID jobId1 = new JobID(jtIdentifier, 1);
+ JobID jobId2 = new JobID(jtIdentifier, 2);
+
+ TaskAttemptID taskAttemptId1 = new TaskAttemptID(jtIdentifier, 3, true, 1,
+ 1);
+ TaskAttemptID taskAttemptId2 = new TaskAttemptID(jtIdentifier, 4, true, 1,
+ 1);
+
+ // job actions which is localizing
+ KillJobAction jAction1 = new KillJobAction(jobId1);
+ RunningJob rjob1 = new RunningJob(jAction1.getJobID());
+ rjob1.localizing = true;
+ // add the job to the runningJobs tree
+ tt.runningJobs.put(jobId1, rjob1);
+ tt.addActionToCleanup(jAction1);
+
+ KillJobAction jAction2 = new KillJobAction(jobId2);
+ RunningJob rjob2 = new RunningJob(jAction2.getJobID());
+ rjob2.localizing = true;
+ // add the job to the runningJobs tree
+ tt.runningJobs.put(jobId2, rjob2);
+ tt.addActionToCleanup(jAction2);
+
+ // task action which is localizing
+ KillTaskAction tAction1 = new KillTaskAction(taskAttemptId1);
+ RunningJob rjob3 = new RunningJob(tAction1.getTaskID().getJobID());
+ // add the job to the runningJobs tree
+ tt.runningJobs.put(rjob3.getJobID(), rjob3);
+ rjob3.localizing = true;
+ tt.addActionToCleanup(tAction1);
+
+ KillTaskAction tAction2 = new KillTaskAction(taskAttemptId2);
+ RunningJob rjob4 = new RunningJob(tAction2.getTaskID().getJobID());
+ rjob4.localizing = true;
+ // add the job to the runningJobs tree
+ tt.runningJobs.put(rjob4.getJobID(), rjob4);
+ tt.addActionToCleanup(tAction2);
+
+ // before the task clean up test the queue
+ checkItemCountInQueue(tt, 4, 4, 0);
+
+ // now run the task clean up
+ tt.taskCleanUp();
+
+ // after the one round the first task gets moved to localizing list
+ checkItemCountInQueue(tt, 4, 3, 1);
+ // now run the task clean up
+ tt.taskCleanUp();
+
+ // after the another round the 2nd task gets moved to localizing list
+ checkItemCountInQueue(tt, 4, 2, 2);
+
+ // now run the task clean up
+ tt.taskCleanUp();
+
+ // after the another round the 3rd task gets moved to localizing list
+ checkItemCountInQueue(tt, 4, 1, 3);
+
+ // now run the task clean up
+ tt.taskCleanUp();
+
+ // after the another round the 4th task gets moved to localizing list
+ checkItemCountInQueue(tt, 4, 0, 4);
+
+ // now run the task clean up
+ tt.taskCleanUp();
+
+ // this is a no-op there is a sleep here but cant really test it
+ checkItemCountInQueue(tt, 4, 0, 4);
+ }
+
+ @Test
+ // test when all active items in the queue
+ public void testAllActiveJobActions() throws InterruptedException,
+ IOException {
+ // job and attempt ids
+ JobID jobId1 = new JobID(jtIdentifier, 1);
+ JobID jobId2 = new JobID(jtIdentifier, 2);
+
+ TaskAttemptID taskAttemptId1 = new TaskAttemptID(jtIdentifier, 3, true, 1,
+ 1);
+ TaskAttemptID taskAttemptId2 = new TaskAttemptID(jtIdentifier, 4, true, 1,
+ 1);
+
+ // job actions which is localizing
+ KillJobAction jAction1 = new KillJobAction(jobId1);
+ RunningJob rjob1 = new RunningJob(jAction1.getJobID());
+ rjob1.localizing = false;
+ // add the job to the runningJobs tree
+ tt.runningJobs.put(jobId1, rjob1);
+ tt.addActionToCleanup(jAction1);
+
+ KillJobAction jAction2 = new KillJobAction(jobId2);
+ RunningJob rjob2 = new RunningJob(jAction2.getJobID());
+ rjob2.localizing = false;
+ // add the job to the runningJobs tree
+ tt.runningJobs.put(jobId2, rjob2);
+ tt.addActionToCleanup(jAction2);
+
+ // task action which is localizing
+ KillTaskAction tAction1 = new KillTaskAction(taskAttemptId1);
+ RunningJob rjob3 = new RunningJob(tAction1.getTaskID().getJobID());
+ // add the job to the runningJobs tree
+ tt.runningJobs.put(rjob3.getJobID(), rjob3);
+ rjob3.localizing = false;
+ tt.addActionToCleanup(tAction1);
+
+ KillTaskAction tAction2 = new KillTaskAction(taskAttemptId2);
+ RunningJob rjob4 = new RunningJob(tAction2.getTaskID().getJobID());
+ rjob4.localizing = false;
+ // add the job to the runningJobs tree
+ tt.runningJobs.put(rjob4.getJobID(), rjob4);
+ tt.addActionToCleanup(tAction2);
+
+ // before the task clean up test the queue
+ checkItemCountInQueue(tt, 4, 4, 0);
+
+ // now run the task clean up
+ tt.taskCleanUp();
+
+ // after the one round the first task gets killed
+ checkItemCountInQueue(tt, 3, 3, 0);
+ // now run the task clean up
+ tt.taskCleanUp();
+
+ // after the another round the 2nd task gets killed
+ checkItemCountInQueue(tt, 2, 2, 0);
+
+ // now run the task clean up
+ tt.taskCleanUp();
+
+ // after the another round the 3rd task gets killed
+ checkItemCountInQueue(tt, 1, 1, 0);
+
+ // now run the task clean up
+ tt.taskCleanUp();
+
+ // after the another round the 4th task gets killed
+ checkItemCountInQueue(tt, 0, 0, 0);
+
+ // now run the task clean up
+ tt.taskCleanUp();
+
+ // this is a no-op there is a sleep here but cant really test it
+ checkItemCountInQueue(tt, 0, 0, 0);
+ }
+
+ @Test
+ // test when some items are localizing and some are not in the queue
+ public void testMixedJobActions() throws InterruptedException, IOException {
+ // job and attempt ids
+ JobID jobId1 = new JobID(jtIdentifier, 1);
+ JobID jobId2 = new JobID(jtIdentifier, 2);
+
+ TaskAttemptID taskAttemptId1 = new TaskAttemptID(jtIdentifier, 3, true, 1,
+ 1);
+ TaskAttemptID taskAttemptId2 = new TaskAttemptID(jtIdentifier, 4, true, 1,
+ 1);
+
+ // job actions which is localizing
+ KillJobAction jAction1 = new KillJobAction(jobId1);
+ RunningJob rjob1 = new RunningJob(jAction1.getJobID());
+ rjob1.localizing = true;
+ // add the job to the runningJobs tree
+ tt.runningJobs.put(jobId1, rjob1);
+ tt.addActionToCleanup(jAction1);
+
+ KillJobAction jAction2 = new KillJobAction(jobId2);
+ RunningJob rjob2 = new RunningJob(jAction2.getJobID());
+ rjob2.localizing = false;
+ // add the job to the runningJobs tree
+ tt.runningJobs.put(jobId2, rjob2);
+ tt.addActionToCleanup(jAction2);
+
+ // task action which is localizing
+ KillTaskAction tAction1 = new KillTaskAction(taskAttemptId1);
+ RunningJob rjob3 = new RunningJob(tAction1.getTaskID().getJobID());
+ // add the job to the runningJobs tree
+ tt.runningJobs.put(rjob3.getJobID(), rjob3);
+ rjob3.localizing = true;
+ tt.addActionToCleanup(tAction1);
+
+ KillTaskAction tAction2 = new KillTaskAction(taskAttemptId2);
+ RunningJob rjob4 = new RunningJob(tAction2.getTaskID().getJobID());
+ rjob4.localizing = false;
+ // add the job to the runningJobs tree
+ tt.runningJobs.put(rjob4.getJobID(), rjob4);
+ tt.addActionToCleanup(tAction2);
+
+ // before the task clean up test the queue
+ checkItemCountInQueue(tt, 4, 4, 0);
+
+ // now run the task clean up
+ tt.taskCleanUp();
+
+ // since the first attempt is localizing it will move to the
+ // localizing queue
+ checkItemCountInQueue(tt, 4, 3, 1);
+
+ // now run the task clean up
+ tt.taskCleanUp();
+
+ // after the 2nd round the tip is a job that can be cleaned up
+ checkItemCountInQueue(tt, 3, 2, 1);
+
+ // now run the task clean up
+ tt.taskCleanUp();
+
+ // after the 3rd round the 3rd task is getting localized
+ checkItemCountInQueue(tt, 3, 1, 2);
+
+ // now run the task clean up
+ tt.taskCleanUp();
+
+ // after the another round the 4th task gets killed
+ checkItemCountInQueue(tt, 2, 0, 2);
+
+ // now run the task clean up
+ tt.taskCleanUp();
+
+ // nothing should change as the tasks are being localized
+ checkItemCountInQueue(tt, 2, 0, 2);
+
+ }
+
+ // test when some items are localizing and some are not in the queue
+ // and we update the status of the actions between clean up jobs
+ @Test
+ public void testMixedJobActionsAndUpdateActions()
+ throws InterruptedException, IOException {
+ // job and attempt ids
+ JobID jobId1 = new JobID(jtIdentifier, 1);
+ JobID jobId2 = new JobID(jtIdentifier, 2);
+
+ TaskAttemptID taskAttemptId1 = new TaskAttemptID(jtIdentifier, 3, true, 1,
+ 1);
+ TaskAttemptID taskAttemptId2 = new TaskAttemptID(jtIdentifier, 4, true, 1,
+ 1);
+
+ // job actions which is localizing
+ KillJobAction jAction1 = new KillJobAction(jobId1);
+ RunningJob rjob1 = new RunningJob(jAction1.getJobID());
+ rjob1.localizing = true;
+ // add the job to the runningJobs tree
+ tt.runningJobs.put(jobId1, rjob1);
+ tt.addActionToCleanup(jAction1);
+
+ KillJobAction jAction2 = new KillJobAction(jobId2);
+ RunningJob rjob2 = new RunningJob(jAction2.getJobID());
+ rjob2.localizing = false;
+ // add the job to the runningJobs tree
+ tt.runningJobs.put(jobId2, rjob2);
+ tt.addActionToCleanup(jAction2);
+
+ // task action which is localizing
+ KillTaskAction tAction1 = new KillTaskAction(taskAttemptId1);
+ RunningJob rjob3 = new RunningJob(tAction1.getTaskID().getJobID());
+ // add the job to the runningJobs tree
+ tt.runningJobs.put(rjob3.getJobID(), rjob3);
+ rjob3.localizing = true;
+ tt.addActionToCleanup(tAction1);
+
+ KillTaskAction tAction2 = new KillTaskAction(taskAttemptId2);
+ RunningJob rjob4 = new RunningJob(tAction2.getTaskID().getJobID());
+ rjob4.localizing = false;
+ // add the job to the runningJobs tree
+ tt.runningJobs.put(rjob4.getJobID(), rjob4);
+ tt.addActionToCleanup(tAction2);
+
+ // before the task clean up test the queue
+ checkItemCountInQueue(tt, 4, 4, 0);
+
+ // now run the task clean up
+ tt.taskCleanUp();
+
+ // since the first attempt is localizing it will move to the
+ // localizing queue
+ checkItemCountInQueue(tt, 4, 3, 1);
+
+ // before running clean up again change the status of the first attempt to
+ // not be localizing.
+ rjob1.localizing = false;
+
+ // now run the task clean up
+ tt.taskCleanUp();
+
+ // after the 2nd round the tip is a job that can be cleaned up so it is
+ // cleaned and the localizing job is no longer being localized so it moves
+ // to the active queue
+ checkItemCountInQueue(tt, 3, 3, 0);
+
+ // now run the task clean up
+ tt.taskCleanUp();
+
+ // after the 3rd round the 3rd task is getting localized
+ // so gets moved to the localized queue.
+ checkItemCountInQueue(tt, 3, 2, 1);
+
+ // now run the task clean up
+ tt.taskCleanUp();
+
+ // after the another round the 4th task gets killed
+ // and leaves 2 tasks one which switched from localizing to clean
+ // and one that is being localized.
+ checkItemCountInQueue(tt, 2, 1, 1);
+ }
+
+ /*
+ * Method to test the number of items in the various datastructures holding
+ * clean up actions
+ */
+ private void checkItemCountInQueue(TaskTracker tt, int allSize,
+ int activeSize, int inactiveSize) {
+ // check the size and the content of the hashset and queue
+ assertEquals("Size of allCleanUpActions is not " + allSize, allSize,
+ tt.allCleanupActions.size());
+ assertEquals("Size of activeCleanUpActions is not " + activeSize,
+ activeSize, tt.activeCleanupActions.size());
+ assertEquals("Size of inactiveCleanUpActions is not " + inactiveSize,
+ inactiveSize, tt.inactiveCleanupActions.size());
+ }
+
+ class TestTaskTracker extends TaskTracker {
+ // override the method so its a no-op
+ synchronized void purgeJob(KillJobAction action) throws IOException {
+ LOG.info("Received 'KillJobAction' for job: " + action.getJobID());
+ }
+
+ // override the method so its a no-op
+ void processKillTaskAction(KillTaskAction killAction) throws IOException {
+ LOG.info("Received KillTaskAction for task: " + killAction.getTaskID());
+ }
+ }
+}
\ No newline at end of file
Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTaskTrackerVersionCheck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTaskTrackerVersionCheck.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTaskTrackerVersionCheck.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTaskTrackerVersionCheck.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.util.VersionInfo;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ * Test the version check the TT performs when connecting to the JT
+ */
+public class TestTaskTrackerVersionCheck {
+
+ /**
+ * Test the default TT version checking
+ */
+ @Test
+ public void testDefaultVersionCheck() throws IOException {
+ MiniMRCluster mr = null;
+ try {
+ JobConf jtConf = new JobConf();
+ mr = new MiniMRCluster(1, "file:///", 1, null, null, jtConf);
+ TaskTracker tt = mr.getTaskTrackerRunner(0).getTaskTracker();
+ String currBuildVersion = VersionInfo.getBuildVersion();
+ String currVersion = VersionInfo.getVersion();
+
+ assertTrue(tt.isPermittedVersion(currBuildVersion, currVersion));
+ assertFalse("We disallow different versions",
+ tt.isPermittedVersion(currBuildVersion+"x", currVersion+"x"));
+ assertFalse("We disallow different full versions with same version",
+ tt.isPermittedVersion(currBuildVersion+"x", currVersion));
+ try {
+ tt.isPermittedVersion(currBuildVersion, currVersion+"x");
+ fail("Matched full version with mismatched version");
+ } catch (AssertionError ae) {
+ // Expected. The versions should always match if the full
+ // versions match as the full version contains the version.
+ }
+ } finally {
+ if (mr != null) {
+ mr.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Test the "relaxed" TT version checking
+ */
+ @Test
+ public void testRelaxedVersionCheck() throws IOException {
+ MiniMRCluster mr = null;
+ try {
+ JobConf jtConf = new JobConf();
+ jtConf.setBoolean(
+ CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_KEY, true);
+ mr = new MiniMRCluster(1, "file:///", 1, null, null, jtConf);
+ TaskTracker tt = mr.getTaskTrackerRunner(0).getTaskTracker();
+ String currFullVersion = VersionInfo.getBuildVersion();
+ String currVersion = VersionInfo.getVersion();
+
+ assertTrue(tt.isPermittedVersion(currFullVersion, currVersion));
+ assertFalse("We dissallow different versions",
+ tt.isPermittedVersion(currFullVersion+"x", currVersion+"x"));
+ assertTrue("We allow different full versions with same version",
+ tt.isPermittedVersion(currFullVersion+"x", currVersion));
+ } finally {
+ if (mr != null) {
+ mr.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Test no TT version checking
+ */
+ @Test
+ public void testNoVersionCheck() throws IOException {
+ MiniMRCluster mr = null;
+ try {
+ JobConf jtConf = new JobConf();
+ jtConf.setBoolean(
+ CommonConfigurationKeys.HADOOP_SKIP_VERSION_CHECK_KEY, true);
+ mr = new MiniMRCluster(1, "file:///", 1, null, null, jtConf);
+ TaskTracker tt = mr.getTaskTrackerRunner(0).getTaskTracker();
+ String currFullVersion = VersionInfo.getBuildVersion();
+ String currVersion = VersionInfo.getVersion();
+
+ assertTrue(tt.isPermittedVersion(currFullVersion, currVersion));
+ assertTrue(tt.isPermittedVersion(currFullVersion+"x", currVersion+"x"));
+ assertTrue(tt.isPermittedVersion(currFullVersion+"x", currVersion));
+ } finally {
+ if (mr != null) {
+ mr.shutdown();
+ }
+ }
+ }
+}
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java Fri Jun 21 06:37:27 2013
@@ -20,16 +20,20 @@ package org.apache.hadoop.mapred;
import java.io.*;
import java.util.*;
-import junit.framework.TestCase;
-import org.apache.commons.logging.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.util.ReflectionUtils;
-public class TestTextInputFormat extends TestCase {
+import org.junit.Test;
+import static junit.framework.Assert.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TestTextInputFormat {
private static final Log LOG =
LogFactory.getLog(TestTextInputFormat.class.getName());
@@ -39,17 +43,20 @@ public class TestTextInputFormat extends
private static FileSystem localFs = null;
static {
try {
+ defaultConf.set("fs.default.name", "file:///");
localFs = FileSystem.getLocal(defaultConf);
} catch (IOException e) {
throw new RuntimeException("init failure", e);
}
}
- private static Path workDir =
- new Path(new Path(System.getProperty("test.build.data", "."), "data"),
- "TestTextInputFormat");
-
+
+ private static Path workDir =
+ new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+ "TestTextInputFormat").makeQualified(localFs);
+
+ @Test
public void testFormat() throws Exception {
- JobConf job = new JobConf();
+ JobConf job = new JobConf(defaultConf);
Path file = new Path(workDir, "test.txt");
// A reporter that does nothing
@@ -127,6 +134,95 @@ public class TestTextInputFormat extends
}
}
+ @Test
+ public void testSplitableCodecs() throws IOException {
+ JobConf conf = new JobConf(defaultConf);
+ int seed = new Random().nextInt();
+ // Create the codec
+ CompressionCodec codec = null;
+ try {
+ codec = (CompressionCodec)
+ ReflectionUtils.newInstance(conf.getClassByName("org.apache.hadoop.io.compress.BZip2Codec"), conf);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException("Illegal codec!");
+ }
+ Path file = new Path(workDir, "test"+codec.getDefaultExtension());
+
+ // A reporter that does nothing
+ Reporter reporter = Reporter.NULL;
+ LOG.info("seed = "+seed);
+ Random random = new Random(seed);
+ FileSystem localFs = FileSystem.getLocal(conf);
+
+ localFs.delete(workDir, true);
+ FileInputFormat.setInputPaths(conf, workDir);
+
+ final int MAX_LENGTH = 500000;
+
+ // for a variety of lengths
+ for (int length = MAX_LENGTH / 2; length < MAX_LENGTH;
+ length += random.nextInt(MAX_LENGTH / 4)+1) {
+
+ LOG.info("creating; entries = " + length);
+
+ // create a file with length entries
+ Writer writer =
+ new OutputStreamWriter(codec.createOutputStream(localFs.create(file)));
+ try {
+ for (int i = 0; i < length; i++) {
+ writer.write(Integer.toString(i));
+ writer.write("\n");
+ }
+ } finally {
+ writer.close();
+ }
+
+ // try splitting the file in a variety of sizes
+ TextInputFormat format = new TextInputFormat();
+ format.configure(conf);
+ LongWritable key = new LongWritable();
+ Text value = new Text();
+ for (int i = 0; i < 3; i++) {
+ int numSplits = random.nextInt(MAX_LENGTH/2000)+1;
+ LOG.info("splitting: requesting = " + numSplits);
+ InputSplit[] splits = format.getSplits(conf, numSplits);
+ LOG.info("splitting: got = " + splits.length);
+
+ // check each split
+ BitSet bits = new BitSet(length);
+ for (int j = 0; j < splits.length; j++) {
+ LOG.debug("split["+j+"]= " + splits[j]);
+ RecordReader<LongWritable, Text> reader =
+ format.getRecordReader(splits[j], conf, reporter);
+ try {
+ int counter = 0;
+ while (reader.next(key, value)) {
+ int v = Integer.parseInt(value.toString());
+ LOG.debug("read " + v);
+
+ if (bits.get(v)) {
+ LOG.warn("conflict with " + v +
+ " in split " + j +
+ " at position "+reader.getPos());
+ }
+ assertFalse("Key in multiple partitions.", bits.get(v));
+ bits.set(v);
+ counter++;
+ }
+ if (counter > 0) {
+ LOG.info("splits["+j+"]="+splits[j]+" count=" + counter);
+ } else {
+ LOG.debug("splits["+j+"]="+splits[j]+" count=" + counter);
+ }
+ } finally {
+ reader.close();
+ }
+ }
+ assertEquals("Some keys in no partition.", length, bits.cardinality());
+ }
+ }
+ }
+
private static LineReader makeStream(String str) throws IOException {
return new LineReader(new ByteArrayInputStream
(str.getBytes("UTF-8")),
@@ -138,6 +234,7 @@ public class TestTextInputFormat extends
bufsz);
}
+ @Test
public void testUTF8() throws Exception {
LineReader in = makeStream("abcd\u20acbdcd\u20ac");
Text line = new Text();
@@ -156,6 +253,7 @@ public class TestTextInputFormat extends
*
* @throws Exception
*/
+ @Test
public void testNewLines() throws Exception {
final String STR = "a\nbb\n\nccc\rdddd\r\r\r\n\r\neeeee";
final int STRLENBYTES = STR.getBytes().length;
@@ -195,6 +293,7 @@ public class TestTextInputFormat extends
*
* @throws Exception
*/
+ @Test
public void testMaxLineLength() throws Exception {
final String STR = "a\nbb\n\nccc\rdddd\r\neeeee";
final int STRLENBYTES = STR.getBytes().length;
@@ -253,8 +352,9 @@ public class TestTextInputFormat extends
/**
* Test using the gzip codec for reading
*/
- public static void testGzip() throws IOException {
- JobConf job = new JobConf();
+ @Test
+ public void testGzip() throws IOException {
+ JobConf job = new JobConf(defaultConf);
CompressionCodec gzip = new GzipCodec();
ReflectionUtils.setConf(gzip, job);
localFs.delete(workDir, true);
@@ -286,8 +386,9 @@ public class TestTextInputFormat extends
/**
* Test using the gzip codec and an empty input file
*/
- public static void testGzipEmpty() throws IOException {
- JobConf job = new JobConf();
+ @Test
+ public void testGzipEmpty() throws IOException {
+ JobConf job = new JobConf(defaultConf);
CompressionCodec gzip = new GzipCodec();
ReflectionUtils.setConf(gzip, job);
localFs.delete(workDir, true);
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestUserDefinedCounters.java Fri Jun 21 06:37:27 2013
@@ -130,7 +130,7 @@ public class TestUserDefinedCounters ext
assertEquals(4,
runningJob.getCounters().getGroup("StringCounter")
.getCounter("MapRecords"));
- assertTrue(counters.getGroupNames().size() <= 51);
+ assertTrue(counters.getGroupNames().size() <= Counters.MAX_GROUP_LIMIT);
int i = 0;
while (counters.size() < Counters.MAX_COUNTER_LIMIT) {
counters.incrCounter("IncrCounter", "limit " + i, 2);
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestWebUIAuthorization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestWebUIAuthorization.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestWebUIAuthorization.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestWebUIAuthorization.java Fri Jun 21 06:37:27 2013
@@ -20,13 +20,20 @@ package org.apache.hadoop.mapred;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
-import java.net.URL;
import java.net.HttpURLConnection;
+import java.net.URL;
import java.net.URLEncoder;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@@ -35,20 +42,13 @@ import org.apache.hadoop.mapred.JobHisto
import org.apache.hadoop.mapred.JobHistory.TaskAttempt;
import org.apache.hadoop.mapred.QueueManager.QueueACL;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.HadoopPolicyProvider;
+import org.apache.hadoop.security.authorize.PolicyProvider;
import org.junit.Test;
-import java.security.PrivilegedExceptionAction;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
public class TestWebUIAuthorization extends ClusterMapReduceTestCase {
private static final Log LOG = LogFactory.getLog(
@@ -87,7 +87,13 @@ public class TestWebUIAuthorization exte
static int getHttpStatusCode(String urlstring, String userName,
String method) throws IOException {
LOG.info("Accessing " + urlstring + " as user " + userName);
- URL url = new URL(urlstring + "&user.name=" + userName);
+ URL url = null;
+ if (userName == null) {
+ url = new URL(urlstring);
+ } else {
+ url = new URL(urlstring + "&user.name=" + userName);
+ }
+
HttpURLConnection connection = (HttpURLConnection)url.openConnection();
connection.setRequestMethod(method);
if (method.equals("POST")) {
@@ -776,7 +782,13 @@ public class TestWebUIAuthorization exte
Properties props = new Properties();
props.setProperty("hadoop.http.filter.initializers",
DummyFilterInitializer.class.getName());
- props.setProperty(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, "true");
+ props.setProperty(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
+ "true");
+ props.setProperty(PolicyProvider.POLICY_PROVIDER_CONFIG,
+ HadoopPolicyProvider.class.getName());
+ props.setProperty(
+ CommonConfigurationKeys.HADOOP_SECURITY_INSTRUMENTATION_REQUIRES_ADMIN,
+ "true");
props.setProperty(JobConf.MR_ADMINS, mrAdminUser + " " + mrAdminGroup);
startCluster(true, props);
@@ -908,9 +920,16 @@ public class TestWebUIAuthorization exte
String taskGraphServlet = jtURL + "/taskgraph?type=map&jobid="
+ jobid.toString();
validateViewJob(taskGraphServlet, "GET");
+ assertEquals("Incorrect return code for null user",
+ HttpURLConnection.HTTP_UNAUTHORIZED,
+ getHttpStatusCode(taskGraphServlet, null, "GET"));
+
taskGraphServlet = jtURL + "/taskgraph?type=reduce&jobid="
+ jobid.toString();
validateViewJob(taskGraphServlet, "GET");
+ assertEquals("Incorrect return code for null user",
+ HttpURLConnection.HTTP_UNAUTHORIZED,
+ getHttpStatusCode(taskGraphServlet, null, "GET"));
}
// validate access of jobdetails.jsp
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/UtilsForTests.java Fri Jun 21 06:37:27 2013
@@ -843,6 +843,10 @@ public class UtilsForTests {
JobConf conf = new JobConf();
conf.set("mapred.job.tracker", "localhost:0");
conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+ return getJobTracker(conf, qm);
+ }
+
+ static JobTracker getJobTracker(JobConf conf, QueueManager qm) {
JobTracker jt;
try {
if (qm == null) {
@@ -850,6 +854,10 @@ public class UtilsForTests {
} else {
jt = new JobTracker(conf, qm);
}
+ jt.setSafeModeInternal(JobTracker.SafeModeAction.SAFEMODE_ENTER);
+ jt.initializeFilesystem();
+ jt.setSafeModeInternal(JobTracker.SafeModeAction.SAFEMODE_LEAVE);
+ jt.initialize();
return jt;
} catch (Exception e) {
throw new RuntimeException("Could not start jt", e);
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java Fri Jun 21 06:37:27 2013
@@ -18,6 +18,9 @@
package org.apache.hadoop.mapred.lib;
import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.zip.GZIPOutputStream;
import junit.framework.TestCase;
@@ -25,6 +28,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -66,6 +70,7 @@ public class TestCombineFileInputFormat
final Path dir2 = new Path(inDir, "/dir2");
final Path dir3 = new Path(inDir, "/dir3");
final Path dir4 = new Path(inDir, "/dir4");
+ final Path dir5 = new Path(inDir, "/dir5");
static final int BLOCKSIZE = 1024;
static final byte[] databuf = new byte[BLOCKSIZE];
@@ -81,6 +86,24 @@ public class TestCombineFileInputFormat
return null;
}
}
+
+ /** Dummy class to extend CombineFileInputFormat. It allows
+ * non-existent files to be passed into the CombineFileInputFormat, allows
+ * for easy testing without having to create real files.
+ */
+ private class DummyInputFormat1 extends DummyInputFormat {
+ @Override
+ protected FileStatus[] listStatus(JobConf job) throws IOException {
+ Path[] files = getInputPaths(job);
+ FileStatus[] results = new FileStatus[files.length];
+ for (int i = 0; i < files.length; i++) {
+ Path p = files[i];
+ FileSystem fs = p.getFileSystem(job);
+ results[i] = fs.getFileStatus(p);
+ }
+ return results;
+ }
+ }
public void testSplitPlacement() throws IOException {
String namenode = null;
@@ -89,16 +112,16 @@ public class TestCombineFileInputFormat
FileSystem fileSys = null;
String testName = "TestSplitPlacement";
try {
- /* Start 3 datanodes, one each in rack r1, r2, r3. Create three files
- * 1) file1, just after starting the datanode on r1, with
+ /* Start 3 datanodes, one each in rack r1, r2, r3. Create five files
+ * 1) file1 and file5, just after starting the datanode on r1, with
* a repl factor of 1, and,
* 2) file2, just after starting the datanode on r2, with
* a repl factor of 2, and,
- * 3) file3 after starting the all three datanodes, with a repl
+ * 3) file3, file4 after starting the all three datanodes, with a repl
* factor of 3.
- * At the end, file1 will be present on only datanode1, file2 will be
- * present on datanode 1 and datanode2 and
- * file3 will be present on all datanodes.
+ * At the end, file1, file5 will be present on only datanode1, file2 will
+ * be present on datanode 1 and datanode2 and
+ * file3, file4 will be present on all datanodes.
*/
JobConf conf = new JobConf();
conf.setBoolean("dfs.replication.considerLoad", false);
@@ -114,6 +137,30 @@ public class TestCombineFileInputFormat
}
Path file1 = new Path(dir1 + "/file1");
writeFile(conf, file1, (short)1, 1);
+ // create another file on the same datanode
+ Path file5 = new Path(dir5 + "/file5");
+ writeFile(conf, file5, (short)1, 1);
+ // split it using a CombinedFile input format
+ DummyInputFormat inFormat = new DummyInputFormat();
+ JobConf job = new JobConf(conf);
+ FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
+ InputSplit[] splits = inFormat.getSplits(job, 1);
+ System.out.println("Made splits(Test0): " + splits.length);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test0): " + split);
+ }
+ assertEquals(splits.length, 1);
+ CombineFileSplit fileSplit = (CombineFileSplit) splits[0];
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(file5.getName(), fileSplit.getPath(1).getName());
+ assertEquals(0, fileSplit.getOffset(1));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
dfs.waitActive();
@@ -122,14 +169,14 @@ public class TestCombineFileInputFormat
writeFile(conf, file2, (short)2, 2);
// split it using a CombinedFile input format
- DummyInputFormat inFormat = new DummyInputFormat();
+ inFormat = new DummyInputFormat();
inFormat.setInputPaths(conf, dir1 + "," + dir2);
inFormat.setMinSplitSizeRack(BLOCKSIZE);
- InputSplit[] splits = inFormat.getSplits(conf, 1);
+ splits = inFormat.getSplits(conf, 1);
System.out.println("Made splits(Test1): " + splits.length);
// make sure that each split has different locations
- CombineFileSplit fileSplit = null;
+ fileSplit = null;
for (int i = 0; i < splits.length; ++i) {
fileSplit = (CombineFileSplit) splits[i];
System.out.println("File split(Test1): " + fileSplit);
@@ -439,7 +486,7 @@ public class TestCombineFileInputFormat
}
}
}
-
+
static void writeFile(Configuration conf, Path name,
short replication, int numBlocks) throws IOException {
FileSystem fileSys = FileSystem.get(conf);
@@ -447,13 +494,377 @@ public class TestCombineFileInputFormat
FSDataOutputStream stm = fileSys.create(name, true,
conf.getInt("io.file.buffer.size", 4096),
replication, (long)BLOCKSIZE);
+ writeDataAndSetReplication(fileSys, name, stm, replication, numBlocks);
+ }
+
+ // Creates the gzip file and return the FileStatus
+ static FileStatus writeGzipFile(Configuration conf, Path name,
+ short replication, int numBlocks) throws IOException {
+ FileSystem fileSys = FileSystem.get(conf);
+
+ GZIPOutputStream out = new GZIPOutputStream(fileSys.create(name, true, conf
+ .getInt("io.file.buffer.size", 4096), replication, (long) BLOCKSIZE));
+ writeDataAndSetReplication(fileSys, name, out, replication, numBlocks);
+ return fileSys.getFileStatus(name);
+ }
+
+ private static void writeDataAndSetReplication(FileSystem fileSys, Path name,
+ OutputStream out, short replication, int numBlocks) throws IOException {
for (int i = 0; i < numBlocks; i++) {
- stm.write(databuf);
+ out.write(databuf);
}
- stm.close();
+ out.close();
DFSTestUtil.waitReplication(fileSys, name, replication);
}
+ public void testSplitPlacementForCompressedFiles() throws IOException {
+ MiniDFSCluster dfs = null;
+ FileSystem fileSys = null;
+ try {
+ /* Start 3 datanodes, one each in rack r1, r2, r3. Create five gzipped
+ * files
+ * 1) file1 and file5, just after starting the datanode on r1, with
+ * a repl factor of 1, and,
+ * 2) file2, just after starting the datanode on r2, with
+ * a repl factor of 2, and,
+ * 3) file3, file4 after starting the all three datanodes, with a repl
+ * factor of 3.
+ * At the end, file1, file5 will be present on only datanode1, file2 will
+ * be present on datanode 1 and datanode2 and
+ * file3, file4 will be present on all datanodes.
+ */
+ Configuration conf = new Configuration();
+ conf.setBoolean("dfs.replication.considerLoad", false);
+ dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
+ dfs.waitActive();
+
+ fileSys = dfs.getFileSystem();
+ if (!fileSys.mkdirs(inDir)) {
+ throw new IOException("Mkdirs failed to create " + inDir.toString());
+ }
+ Path file1 = new Path(dir1 + "/file1.gz");
+ FileStatus f1 = writeGzipFile(conf, file1, (short)1, 1);
+ // create another file on the same datanode
+ Path file5 = new Path(dir5 + "/file5.gz");
+ FileStatus f5 = writeGzipFile(conf, file5, (short)1, 1);
+ // split it using a CombinedFile input format
+ DummyInputFormat inFormat = new DummyInputFormat();
+ JobConf job = new JobConf(conf);
+ FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
+ InputSplit[] splits = inFormat.getSplits(job, 1);
+ System.out.println("Made splits(Test0): " + splits.length);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test0): " + split);
+ }
+ assertEquals(splits.length, 1);
+ CombineFileSplit fileSplit = (CombineFileSplit) splits[0];
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(file5.getName(), fileSplit.getPath(1).getName());
+ assertEquals(0, fileSplit.getOffset(1));
+ assertEquals(f5.getLen(), fileSplit.getLength(1));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
+ dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
+ dfs.waitActive();
+
+ // create file on two datanodes.
+ Path file2 = new Path(dir2 + "/file2.gz");
+ FileStatus f2 = writeGzipFile(conf, file2, (short)2, 2);
+
+ // split it using a CombinedFile input format
+ inFormat = new DummyInputFormat();
+ FileInputFormat.setInputPaths(job, dir1 + "," + dir2);
+ inFormat.setMinSplitSizeRack(f1.getLen());
+ splits = inFormat.getSplits(job, 1);
+ System.out.println("Made splits(Test1): " + splits.length);
+
+ // make sure that each split has different locations
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test1): " + split);
+ }
+ assertEquals(2, splits.length);
+ fileSplit = (CombineFileSplit) splits[0];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+ fileSplit = (CombineFileSplit) splits[1];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ // create another file on 3 datanodes and 3 racks.
+ dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
+ dfs.waitActive();
+ Path file3 = new Path(dir3 + "/file3.gz");
+ FileStatus f3 = writeGzipFile(conf, file3, (short)3, 3);
+ inFormat = new DummyInputFormat();
+ FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3);
+ inFormat.setMinSplitSizeRack(f1.getLen());
+ splits = inFormat.getSplits(job, 1);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test2): " + split);
+ }
+ assertEquals(3, splits.length);
+ fileSplit = (CombineFileSplit) splits[0];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f3.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+ fileSplit = (CombineFileSplit) splits[1];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+ fileSplit = (CombineFileSplit) splits[2];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ // create file4 on all three racks
+ Path file4 = new Path(dir4 + "/file4.gz");
+ FileStatus f4 = writeGzipFile(conf, file4, (short)3, 3);
+ inFormat = new DummyInputFormat();
+ FileInputFormat.setInputPaths(job,
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+ inFormat.setMinSplitSizeRack(f1.getLen());
+ splits = inFormat.getSplits(job, 1);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test3): " + split);
+ }
+ assertEquals(3, splits.length);
+ fileSplit = (CombineFileSplit) splits[0];
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f3.getLen(), fileSplit.getLength(0));
+ assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+ assertEquals(0, fileSplit.getOffset(1));
+ assertEquals(f4.getLen(), fileSplit.getLength(1));
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+ fileSplit = (CombineFileSplit) splits[1];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+ fileSplit = (CombineFileSplit) splits[2];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ // maximum split size is file1's length
+ inFormat = new DummyInputFormat();
+ inFormat.setMinSplitSizeNode(f1.getLen());
+ inFormat.setMaxSplitSize(f1.getLen());
+ FileInputFormat.setInputPaths(job,
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+ splits = inFormat.getSplits(job, 1);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test4): " + split);
+ }
+ assertEquals(4, splits.length);
+ fileSplit = (CombineFileSplit) splits[0];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f3.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+ fileSplit = (CombineFileSplit) splits[1];
+ assertEquals(file4.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f4.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+ fileSplit = (CombineFileSplit) splits[2];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+ fileSplit = (CombineFileSplit) splits[3];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ // maximum split size is twice file1's length
+ inFormat = new DummyInputFormat();
+ inFormat.setMinSplitSizeNode(f1.getLen());
+ inFormat.setMaxSplitSize(2 * f1.getLen());
+ FileInputFormat.setInputPaths(job,
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+ splits = inFormat.getSplits(job, 1);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test5): " + split);
+ }
+ assertEquals(3, splits.length);
+ fileSplit = (CombineFileSplit) splits[0];
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f3.getLen(), fileSplit.getLength(0));
+ assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+ assertEquals(0, fileSplit.getOffset(1));
+ assertEquals(f4.getLen(), fileSplit.getLength(1));
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+ fileSplit = (CombineFileSplit) splits[1];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+ fileSplit = (CombineFileSplit) splits[2];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ // maximum split size is 4 times file1's length
+ inFormat = new DummyInputFormat();
+ inFormat.setMinSplitSizeNode(2 * f1.getLen());
+ inFormat.setMaxSplitSize(4 * f1.getLen());
+ FileInputFormat.setInputPaths(job,
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+ splits = inFormat.getSplits(job, 1);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test6): " + split);
+ }
+ assertEquals(2, splits.length);
+ fileSplit = (CombineFileSplit) splits[0];
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f3.getLen(), fileSplit.getLength(0));
+ assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+ assertEquals(0, fileSplit.getOffset(1));
+ assertEquals(f4.getLen(), fileSplit.getLength(1));
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+ fileSplit = (CombineFileSplit) splits[1];
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+ assertEquals(0, fileSplit.getOffset(1), BLOCKSIZE);
+ assertEquals(f2.getLen(), fileSplit.getLength(1));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ // maximum split size and min-split-size per rack is 4 times file1's length
+ inFormat = new DummyInputFormat();
+ inFormat.setMaxSplitSize(4 * f1.getLen());
+ inFormat.setMinSplitSizeRack(4 * f1.getLen());
+ FileInputFormat.setInputPaths(job,
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+ splits = inFormat.getSplits(job, 1);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test7): " + split);
+ }
+ assertEquals(1, splits.length);
+ fileSplit = (CombineFileSplit) splits[0];
+ assertEquals(4, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
+ // minimum split size per node is 4 times file1's length
+ inFormat = new DummyInputFormat();
+ inFormat.setMinSplitSizeNode(4 * f1.getLen());
+ FileInputFormat.setInputPaths(job,
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+ splits = inFormat.getSplits(job, 1);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test8): " + split);
+ }
+ assertEquals(1, splits.length);
+ fileSplit = (CombineFileSplit) splits[0];
+ assertEquals(4, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
+ // Rack 1 has file1, file2 and file3 and file4
+ // Rack 2 has file2 and file3 and file4
+ // Rack 3 has file3 and file4
+ // setup a filter so that only file1 and file2 can be combined
+ inFormat = new DummyInputFormat();
+ FileInputFormat.addInputPath(job, inDir);
+ inFormat.setMinSplitSizeRack(1); // everything is at least rack local
+ inFormat.createPool(job, new TestFilter(dir1),
+ new TestFilter(dir2));
+ splits = inFormat.getSplits(job, 1);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test9): " + split);
+ }
+ assertEquals(3, splits.length);
+ fileSplit = (CombineFileSplit) splits[0];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+ fileSplit = (CombineFileSplit) splits[1];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+ fileSplit = (CombineFileSplit) splits[2];
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+
+ // measure performance when there are multiple pools and
+ // many files in each pool.
+ int numPools = 100;
+ int numFiles = 1000;
+ DummyInputFormat1 inFormat1 = new DummyInputFormat1();
+ for (int i = 0; i < numFiles; i++) {
+ FileInputFormat.setInputPaths(job, file1);
+ }
+ inFormat1.setMinSplitSizeRack(1); // everything is at least rack local
+ final Path dirNoMatch1 = new Path(inDir, "/dirxx");
+ final Path dirNoMatch2 = new Path(inDir, "/diryy");
+ for (int i = 0; i < numPools; i++) {
+ inFormat1.createPool(job, new TestFilter(dirNoMatch1),
+ new TestFilter(dirNoMatch2));
+ }
+ long start = System.currentTimeMillis();
+ splits = inFormat1.getSplits(job, 1);
+ long end = System.currentTimeMillis();
+ System.out.println("Elapsed time for " + numPools + " pools " +
+ " and " + numFiles + " files is " +
+ ((end - start)) + " milli seconds.");
+ } finally {
+ if (dfs != null) {
+ dfs.shutdown();
+ }
+ }
+ }
+
/**
* Test when input files are from non-default file systems
*/
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java Fri Jun 21 06:37:27 2013
@@ -48,9 +48,6 @@ public class TestLineInputFormat extends
JobConf job = new JobConf();
Path file = new Path(workDir, "test.txt");
- int seed = new Random().nextInt();
- Random random = new Random(seed);
-
localFs.delete(workDir, true);
FileInputFormat.setInputPaths(job, workDir);
int numLinesPerMap = 5;
@@ -58,7 +55,8 @@ public class TestLineInputFormat extends
// for a variety of lengths
for (int length = 0; length < MAX_LENGTH;
- length += random.nextInt(MAX_LENGTH/10) + 1) {
+ length += 1) {
+ System.out.println("Processing file of length "+length);
// create a file with length entries
Writer writer = new OutputStreamWriter(localFs.create(file));
try {
@@ -69,14 +67,21 @@ public class TestLineInputFormat extends
} finally {
writer.close();
}
- checkFormat(job, numLinesPerMap);
+ int lastN = 0;
+ if (length != 0) {
+ lastN = length % numLinesPerMap;
+ if (lastN == 0) {
+ lastN = numLinesPerMap;
+ }
+ }
+ checkFormat(job, numLinesPerMap, lastN);
}
}
// A reporter that does nothing
private static final Reporter voidReporter = Reporter.NULL;
- void checkFormat(JobConf job, int expectedN) throws IOException{
+ void checkFormat(JobConf job, int expectedN, int lastN) throws IOException{
NLineInputFormat format = new NLineInputFormat();
format.configure(job);
int ignoredNumSplits = 1;
@@ -84,7 +89,8 @@ public class TestLineInputFormat extends
// check all splits except last one
int count = 0;
- for (int j = 0; j < splits.length -1; j++) {
+ for (int j = 0; j < splits.length; j++) {
+ System.out.println("Processing split "+splits[j]);
assertEquals("There are no split locations", 0,
splits[j].getLocations().length);
RecordReader<LongWritable, Text> reader =
@@ -102,16 +108,22 @@ public class TestLineInputFormat extends
try {
count = 0;
while (reader.next(key, value)) {
+ System.out.println("Got "+key+" "+value+" at count "+count+" of split "+j);
count++;
}
} finally {
reader.close();
}
- assertEquals("number of lines in split is " + expectedN ,
- expectedN, count);
+ if ( j == splits.length - 1) {
+ assertEquals("number of lines in split(" + j + ") is wrong" ,
+ lastN, count);
+ } else {
+ assertEquals("number of lines in split(" + j + ") is wrong" ,
+ expectedN, count);
+ }
}
}
-
+
public static void main(String[] args) throws Exception {
new TestLineInputFormat().testFormat();
}
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/db/TestConstructQuery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/db/TestConstructQuery.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/db/TestConstructQuery.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/lib/db/TestConstructQuery.java Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
package org.apache.hadoop.mapred.lib.db;
import org.apache.hadoop.io.NullWritable;
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/MapReduceTestUtil.java Fri Jun 21 06:37:27 2013
@@ -127,7 +127,7 @@ public class MapReduceTestUtil {
public static Job createCopyJob(Configuration conf, Path outdir,
Path... indirs) throws Exception {
conf.setInt("mapred.map.tasks", 3);
- Job theJob = new Job(conf);
+ Job theJob = Job.getInstance(conf);
theJob.setJobName("DataMoveJob");
FileInputFormat.setInputPaths(theJob, indirs);
@@ -157,7 +157,7 @@ public class MapReduceTestUtil {
fs.delete(outdir, true);
}
conf.setInt("mapred.map.max.attempts", 2);
- Job theJob = new Job(conf);
+ Job theJob = Job.getInstance(conf);
theJob.setJobName("Fail-Job");
FileInputFormat.setInputPaths(theJob, indirs);
@@ -182,7 +182,7 @@ public class MapReduceTestUtil {
public static Job createKillJob(Configuration conf, Path outdir,
Path... indirs) throws Exception {
- Job theJob = new Job(conf);
+ Job theJob = Job.getInstance(conf);
theJob.setJobName("Kill-Job");
FileInputFormat.setInputPaths(theJob, indirs);
@@ -251,7 +251,7 @@ public class MapReduceTestUtil {
public static Job createJob(Configuration conf, Path inDir, Path outDir,
int numInputFiles, int numReds, String input) throws IOException {
- Job job = new Job(conf);
+ Job job = Job.getInstance(conf);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDir)) {
fs.delete(outDir, true);
Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestLocalRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestLocalRunner.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestLocalRunner.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestLocalRunner.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.LocalJobRunner;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import org.junit.Test;
+import junit.framework.TestCase;
+
+/**
+ * Stress tests for the LocalJobRunner
+ */
+public class TestLocalRunner extends TestCase {
+
+ private static final Log LOG = LogFactory.getLog(TestLocalRunner.class);
+
+ private static class StressMapper
+ extends Mapper<LongWritable, Text, LongWritable, Text> {
+
+ // Different map tasks operate at different speeds.
+ // We define behavior for 6 threads.
+ private int threadId;
+
+ // Used to ensure that the compiler doesn't optimize away
+ // some code.
+ public long exposedState;
+
+ protected void setup(Context context) {
+ // Get the thread num from the file number.
+ FileSplit split = (FileSplit) context.getInputSplit();
+ Path filePath = split.getPath();
+ String name = filePath.getName();
+ this.threadId = Integer.valueOf(name);
+
+ LOG.info("Thread " + threadId + " : "
+ + context.getInputSplit());
+ }
+
+ /** Map method with different behavior based on the thread id */
+ public void map(LongWritable key, Text val, Context c)
+ throws IOException, InterruptedException {
+
+ switch(threadId) {
+ case 0:
+ // Write a single value and be done.
+ c.write(new LongWritable(0), val);
+ break;
+ case 1:
+ case 2:
+ // Write many values quickly.
+ for (int i = 0; i < 500; i++) {
+ c.write(new LongWritable(0), val);
+ }
+ break;
+ case 3:
+ // Write many values, using thread sleeps to delay this.
+ for (int i = 0; i < 50; i++) {
+ for (int j = 0; j < 10; j++) {
+ c.write(new LongWritable(0), val);
+ }
+ Thread.sleep(1);
+ }
+ break;
+ case 4:
+ // Write many values, using busy-loops to delay this.
+ for (int i = 0; i < 500; i++) {
+ for (int j = 0; j < 10000; j++) {
+ this.exposedState++;
+ }
+ c.write(new LongWritable(0), val);
+ }
+ break;
+ case 5:
+ // Write many values, using very slow busy-loops to delay this.
+ for (int i = 0; i < 500; i++) {
+ for (int j = 0; j < 100000; j++) {
+ this.exposedState++;
+ }
+ c.write(new LongWritable(0), val);
+ }
+ break;
+ default:
+ // Write a single value and be done.
+ c.write(new LongWritable(0), val);
+ break;
+ }
+ }
+
+ protected void cleanup(Context context) {
+ // Output this here, to ensure that the incrementing done in map()
+ // cannot be optimized away.
+ LOG.debug("Busy loop counter: " + this.exposedState);
+ }
+ }
+
+ private static class CountingReducer
+ extends Reducer<LongWritable, Text, LongWritable, LongWritable> {
+
+ public void reduce(LongWritable key, Iterable<Text> vals, Context context)
+ throws IOException, InterruptedException {
+ long out = 0;
+ for (Text val : vals) {
+ out++;
+ }
+
+ context.write(key, new LongWritable(out));
+ }
+ }
+
+ /**
+ * Create a single input file in the input directory.
+ * @param dirPath the directory in which the file resides
+ * @param id the file id number
+ * @param numRecords how many records to write to each file.
+ */
+ private void createInputFile(Path dirPath, int id, int numRecords)
+ throws IOException {
+ final String MESSAGE = "This is a line in a file: ";
+
+ Path filePath = new Path(dirPath, "" + id);
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+
+ OutputStream os = fs.create(filePath);
+ BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+
+ for (int i = 0; i < numRecords; i++) {
+ w.write(MESSAGE + id + " " + i + "\n");
+ }
+
+ w.close();
+ }
+
+ // This is the total number of map output records we expect to generate,
+ // based on input file sizes (see createMultiMapsInput()) and the behavior
+ // of the different StressMapper threads.
+ private final static int TOTAL_RECORDS = 50000
+ + (500 * 500)
+ + (500 * 500)
+ + (20 * 500)
+ + (5000 * 500)
+ + (500 * 500);
+
+ private final String INPUT_DIR = "multiMapInput";
+ private final String OUTPUT_DIR = "multiMapOutput";
+
+ private Path getInputPath() {
+ String dataDir = System.getProperty("test.build.data");
+ if (null == dataDir) {
+ return new Path(INPUT_DIR);
+ } else {
+ return new Path(new Path(dataDir), INPUT_DIR);
+ }
+ }
+
+ private Path getOutputPath() {
+ String dataDir = System.getProperty("test.build.data");
+ if (null == dataDir) {
+ return new Path(OUTPUT_DIR);
+ } else {
+ return new Path(new Path(dataDir), OUTPUT_DIR);
+ }
+ }
+
+ /**
+ * Create the inputs for the MultiMaps test.
+ * @return the path to the input directory.
+ */
+ private Path createMultiMapsInput() throws IOException {
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path inputPath = getInputPath();
+
+ // Clear the input directory if it exists, first.
+ if (fs.exists(inputPath)) {
+ fs.delete(inputPath, true);
+ }
+
+ // Create input files, with sizes calibrated based on
+ // the amount of work done in each mapper.
+ createInputFile(inputPath, 0, 50000);
+ createInputFile(inputPath, 1, 500);
+ createInputFile(inputPath, 2, 500);
+ createInputFile(inputPath, 3, 20);
+ createInputFile(inputPath, 4, 5000);
+ createInputFile(inputPath, 5, 500);
+
+ return inputPath;
+ }
+
+ /**
+ * Verify that we got the correct amount of output.
+ */
+ private void verifyOutput(Path outputPath) throws IOException {
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+
+ Path outputFile = new Path(outputPath, "part-r-00000");
+ InputStream is = fs.open(outputFile);
+ BufferedReader r = new BufferedReader(new InputStreamReader(is));
+
+ // Should get a single line of the form "0\t(count)"
+ String line = r.readLine().trim();
+ assertTrue("Line does not have correct key", line.startsWith("0\t"));
+ int count = Integer.valueOf(line.substring(2));
+ assertEquals("Incorrect count generated!", TOTAL_RECORDS, count);
+
+ r.close();
+
+ }
+
+ /**
+ * Run a test with several mappers in parallel, operating at different
+ * speeds. Verify that the correct amount of output is created.
+ */
+ @Test
+ public void testMultiMaps() throws Exception {
+ Job job = new Job();
+
+ Path inputPath = createMultiMapsInput();
+ Path outputPath = getOutputPath();
+
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+
+ if (fs.exists(outputPath)) {
+ fs.delete(outputPath, true);
+ }
+
+ job.setMapperClass(StressMapper.class);
+ job.setReducerClass(CountingReducer.class);
+ job.setNumReduceTasks(1);
+ LocalJobRunner.setLocalMaxRunningMaps(job, 6);
+ job.getConfiguration().set("io.sort.record.pct", "0.50");
+ job.getConfiguration().set("io.sort.mb", "25");
+ FileInputFormat.addInputPath(job, inputPath);
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ job.waitForCompletion(true);
+
+ verifyOutput(outputPath);
+ }
+
+ /**
+ * Run a test with a misconfigured number of mappers.
+ * Expect failure.
+ */
+ @Test
+ public void testInvalidMultiMapParallelism() throws Exception {
+ Job job = new Job();
+
+ Path inputPath = createMultiMapsInput();
+ Path outputPath = getOutputPath();
+
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+
+ if (fs.exists(outputPath)) {
+ fs.delete(outputPath, true);
+ }
+
+ job.setMapperClass(StressMapper.class);
+ job.setReducerClass(CountingReducer.class);
+ job.setNumReduceTasks(1);
+ LocalJobRunner.setLocalMaxRunningMaps(job, -6);
+ FileInputFormat.addInputPath(job, inputPath);
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ boolean success = job.waitForCompletion(true);
+ assertFalse("Job succeeded somehow", success);
+ }
+
+ /** An IF that creates no splits */
+ private static class EmptyInputFormat extends InputFormat<Object, Object> {
+ public List<InputSplit> getSplits(JobContext context) {
+ return new ArrayList<InputSplit>();
+ }
+
+ public RecordReader<Object, Object> createRecordReader(InputSplit split,
+ TaskAttemptContext context) {
+ return new EmptyRecordReader();
+ }
+ }
+
+ private static class EmptyRecordReader extends RecordReader<Object, Object> {
+ public void initialize(InputSplit split, TaskAttemptContext context) {
+ }
+
+ public Object getCurrentKey() {
+ return new Object();
+ }
+
+ public Object getCurrentValue() {
+ return new Object();
+ }
+
+ public float getProgress() {
+ return 0.0f;
+ }
+
+ public void close() {
+ }
+
+ public boolean nextKeyValue() {
+ return false;
+ }
+ }
+
+ /** Test case for zero mappers */
+ public void testEmptyMaps() throws Exception {
+ Job job = new Job();
+ Path outputPath = getOutputPath();
+
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+
+ if (fs.exists(outputPath)) {
+ fs.delete(outputPath, true);
+ }
+
+ job.setInputFormatClass(EmptyInputFormat.class);
+ job.setNumReduceTasks(1);
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ boolean success = job.waitForCompletion(true);
+ assertTrue("Empty job should work", success);
+ }
+}
\ No newline at end of file
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestMROutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestMROutputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestMROutputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestMROutputFormat.java Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
package org.apache.hadoop.mapreduce;
import java.io.DataInput;
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/TestMapReduceLocal.java Fri Jun 21 06:37:27 2013
@@ -97,7 +97,7 @@ public class TestMapReduceLocal extends
private float last = 0.0f;
private boolean progressCalled = false;
@Override
- public float getProgress() {
+ public float getProgress() throws IOException {
progressCalled = true;
final float ret = super.getProgress();
assertTrue("getProgress decreased", ret >= last);
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Fri Jun 21 06:37:27 2013
@@ -1130,7 +1130,7 @@ public class TestCombineFileInputFormat
dos.writeChars("Local file for CFIF");
dos.close();
- Job job = new Job(conf);
+ Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, lfs.makeQualified(localPath));
DummyInputFormat inFormat = new DummyInputFormat();
List<InputSplit> splits = inFormat.getSplits(job);
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestKeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestKeyValueTextInputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestKeyValueTextInputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestKeyValueTextInputFormat.java Fri Jun 21 06:37:27 2013
@@ -211,7 +211,7 @@ public class TestKeyValueTextInputFormat
* Test using the gzip codec for reading
*/
public static void testGzip() throws Exception {
- Job job = new Job();
+ Job job = Job.getInstance();
CompressionCodec gzip = new GzipCodec();
ReflectionUtils.setConf(gzip, job.getConfiguration());
localFs.delete(workDir, true);
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java Fri Jun 21 06:37:27 2013
@@ -49,37 +49,40 @@ public class TestNLineInputFormat extend
Job job = new Job(conf);
Path file = new Path(workDir, "test.txt");
- int seed = new Random().nextInt();
- Random random = new Random(seed);
-
localFs.delete(workDir, true);
FileInputFormat.setInputPaths(job, workDir);
int numLinesPerMap = 5;
NLineInputFormat.setNumLinesPerSplit(job, numLinesPerMap);
- // for a variety of lengths
for (int length = 0; length < MAX_LENGTH;
- length += random.nextInt(MAX_LENGTH / 10) + 1) {
+ length += 1) {
+
// create a file with length entries
Writer writer = new OutputStreamWriter(localFs.create(file));
try {
for (int i = 0; i < length; i++) {
- writer.write(Integer.toString(i));
+ writer.write(Integer.toString(i)+" some more text");
writer.write("\n");
}
} finally {
writer.close();
}
- checkFormat(job, numLinesPerMap);
+ int lastN = 0;
+ if (length != 0) {
+ lastN = length % 5;
+ if (lastN == 0) {
+ lastN = 5;
+ }
+ }
+ checkFormat(job, numLinesPerMap, lastN);
}
}
- void checkFormat(Job job, int expectedN)
+ void checkFormat(Job job, int expectedN, int lastN)
throws IOException, InterruptedException {
NLineInputFormat format = new NLineInputFormat();
List<InputSplit> splits = format.getSplits(job);
- // check all splits except last one
int count = 0;
- for (int i = 0; i < splits.size() -1; i++) {
+ for (int i = 0; i < splits.size(); i++) {
assertEquals("There are no split locations", 0,
splits.get(i).getLocations().length);
TaskAttemptContext context = MapReduceTestUtil.
@@ -103,8 +106,13 @@ public class TestNLineInputFormat extend
} finally {
reader.close();
}
- assertEquals("number of lines in split is " + expectedN ,
- expectedN, count);
+ if ( i == splits.size() - 1) {
+ assertEquals("number of lines in split(" + i + ") is wrong" ,
+ lastN, count);
+ } else {
+ assertEquals("number of lines in split(" + i + ") is wrong" ,
+ expectedN, count);
+ }
}
}
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java Fri Jun 21 06:37:27 2013
@@ -173,7 +173,8 @@ public class TestMRSequenceFileAsBinaryO
public void testcheckOutputSpecsForbidRecordCompression()
throws IOException {
- Job job = new Job();
+ Job job = Job.getInstance(new Configuration(),
+ "testcheckOutputSpecsForbidRecordCompression");
FileSystem fs = FileSystem.getLocal(job.getConfiguration());
Path outputdir = new Path(System.getProperty("test.build.data", "/tmp")
+ "/output");
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java Fri Jun 21 06:37:27 2013
@@ -136,7 +136,7 @@ public class TestInputSampler {
for (int i = 0; i < TOT_SPLITS; ++i) {
inits[i] = i;
}
- Job ignored = new Job();
+ Job ignored = Job.getInstance();
Object[] samples = sampler.getSample(new TestInputSamplerIF(
NUM_SAMPLES, TOT_SPLITS, inits), ignored);
assertEquals(NUM_SAMPLES, samples.length);
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/security/TestTokenCache.java Fri Jun 21 06:37:27 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.secu
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -311,4 +312,15 @@ public class TestTokenCache {
TokenCache.obtainTokensForNamenodesInternal(
credentials, new Path [] {finalPath}, jConf);
}
+
+
+ @Test
+ public void testCleanUpTokenReferral() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(TokenCache.MAPREDUCE_JOB_CREDENTIALS_BINARY, "foo");
+ TokenCache.cleanUpTokenReferral(conf);
+ assertNull(conf.get(TokenCache.MAPREDUCE_JOB_CREDENTIALS_BINARY));
+ }
+
+
}
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java Fri Jun 21 06:37:27 2013
@@ -65,6 +65,12 @@ public class TestDelegationTokenRenewal
private static Token<?> lastRenewed = null;
private static Token<?> tokenToRenewIn2Sec = null;
+ private static void reset() {
+ counter = 0;
+ lastRenewed = null;
+ tokenToRenewIn2Sec = null;
+ }
+
@Override
public boolean handleKind(Text kind) {
return KIND.equals(kind);
@@ -109,6 +115,7 @@ public class TestDelegationTokenRenewal
@BeforeClass
public static void setUp() throws Exception {
+ Renewer.reset();
conf = new Configuration();
conf.set("mapred.job.tracker", trackerService);
@@ -354,4 +361,19 @@ public class TestDelegationTokenRenewal
//expected
}
}
+
+ /**
+ * Run the testDTRenewal(), close the DelegationTokenRenewal, and run the
+ * testDTRenewal() test again to make sure that DelegationTokenRenewal can be
+ * re-used after its been closed
+ * @throws Exception
+ */
+ @Test
+ public void testDTRenewalAfterClose() throws Exception {
+ Renewer.counter = 0;
+ testDTRenewal();
+ DelegationTokenRenewal.close();
+ Renewer.counter = 0;
+ testDTRenewal();
+ }
}