You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/06/21 08:37:39 UTC

svn commit: r1495297 [37/46] - in /hadoop/common/branches/branch-1-win: ./ bin/ conf/ ivy/ lib/jdiff/ src/c++/libhdfs/docs/ src/c++/libhdfs/tests/conf/ src/contrib/capacity-scheduler/ivy/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred...

Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.RandomDatum;
+import org.apache.hadoop.io.compress.zlib.ZlibFactory;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import junit.framework.TestCase;
+
+public class TestCompressionStreamReuse extends TestCase {
+  private static final Log LOG = LogFactory
+      .getLog(TestCompressionStreamReuse.class);
+
+  private Configuration conf = new Configuration();
+  private int count = 10000;
+  private int seed = new Random().nextInt();
+
+  public void testBZip2Codec() throws IOException {
+    resetStateTest(conf, seed, count,
+        "org.apache.hadoop.io.compress.BZip2Codec");
+  }
+
+  public void testGzipCompressStreamReuse() throws IOException {
+    resetStateTest(conf, seed, count,
+        "org.apache.hadoop.io.compress.GzipCodec");
+  }
+
+  public void testGzipCompressStreamReuseWithParam() throws IOException {
+    Configuration conf = new Configuration(this.conf);
+    ZlibFactory
+        .setCompressionLevel(conf, CompressionLevel.BEST_COMPRESSION);
+    ZlibFactory.setCompressionStrategy(conf,
+        CompressionStrategy.HUFFMAN_ONLY);
+    resetStateTest(conf, seed, count,
+        "org.apache.hadoop.io.compress.GzipCodec");
+  }
+
+  private static void resetStateTest(Configuration conf, int seed, int count,
+      String codecClass) throws IOException {
+    // Create the codec
+    CompressionCodec codec = null;
+    try {
+      codec = (CompressionCodec) ReflectionUtils.newInstance(conf
+          .getClassByName(codecClass), conf);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException("Illegal codec!");
+    }
+    LOG.info("Created a Codec object of type: " + codecClass);
+
+    // Generate data
+    DataOutputBuffer data = new DataOutputBuffer();
+    RandomDatum.Generator generator = new RandomDatum.Generator(seed);
+    for (int i = 0; i < count; ++i) {
+      generator.next();
+      RandomDatum key = generator.getKey();
+      RandomDatum value = generator.getValue();
+
+      key.write(data);
+      value.write(data);
+    }
+    LOG.info("Generated " + count + " records");
+
+    // Compress data
+    DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
+    DataOutputStream deflateOut = new DataOutputStream(
+        new BufferedOutputStream(compressedDataBuffer));
+    CompressionOutputStream deflateFilter = codec
+        .createOutputStream(deflateOut);
+    deflateFilter.write(data.getData(), 0, data.getLength());
+    deflateFilter.finish();
+    deflateFilter.flush();
+    LOG.info("Finished compressing data");
+
+    // reset deflator
+    deflateFilter.resetState();
+    LOG.info("Finished reseting deflator");
+
+    // re-generate data
+    data.reset();
+    generator = new RandomDatum.Generator(seed);
+    for (int i = 0; i < count; ++i) {
+      generator.next();
+      RandomDatum key = generator.getKey();
+      RandomDatum value = generator.getValue();
+
+      key.write(data);
+      value.write(data);
+    }
+    DataInputBuffer originalData = new DataInputBuffer();
+    DataInputStream originalIn = new DataInputStream(
+        new BufferedInputStream(originalData));
+    originalData.reset(data.getData(), 0, data.getLength());
+
+    // re-compress data
+    compressedDataBuffer.reset();
+    deflateOut = new DataOutputStream(new BufferedOutputStream(
+        compressedDataBuffer));
+    deflateFilter = codec.createOutputStream(deflateOut);
+
+    deflateFilter.write(data.getData(), 0, data.getLength());
+    deflateFilter.finish();
+    deflateFilter.flush();
+    LOG.info("Finished re-compressing data");
+
+    // De-compress data
+    DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
+    deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
+        compressedDataBuffer.getLength());
+    CompressionInputStream inflateFilter = codec
+        .createInputStream(deCompressedDataBuffer);
+    DataInputStream inflateIn = new DataInputStream(
+        new BufferedInputStream(inflateFilter));
+
+    // Check
+    for (int i = 0; i < count; ++i) {
+      RandomDatum k1 = new RandomDatum();
+      RandomDatum v1 = new RandomDatum();
+      k1.readFields(originalIn);
+      v1.readFields(originalIn);
+
+      RandomDatum k2 = new RandomDatum();
+      RandomDatum v2 = new RandomDatum();
+      k2.readFields(inflateIn);
+      v2.readFields(inflateIn);
+      assertTrue(
+          "original and compressed-then-decompressed-output not equal",
+          k1.equals(k2) && v1.equals(v2));
+    }
+    LOG.info("SUCCESS! Completed checking " + count + " records");
+  }
+}

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/io/nativeio/TestNativeIO.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/io/nativeio/TestNativeIO.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/io/nativeio/TestNativeIO.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/io/nativeio/TestNativeIO.java Fri Jun 21 06:37:27 2013
@@ -272,6 +272,65 @@ public class TestNativeIO {
     assertPermissions(toChmod, 0644);
   }
 
+  @Test
+  public void testPosixFadvise() throws Exception {
+    FileInputStream fis = new FileInputStream("/dev/zero");
+    try {
+      NativeIO.posix_fadvise(fis.getFD(), 0, 0,
+                             NativeIO.POSIX_FADV_SEQUENTIAL);
+    } catch (UnsupportedOperationException uoe) {
+      // we should just skip the unit test on machines where we don't
+      // have fadvise support
+      assumeTrue(false);
+    } finally {
+      fis.close();
+    }
+
+    try {
+      NativeIO.posix_fadvise(fis.getFD(), 0, 1024,
+                             NativeIO.POSIX_FADV_SEQUENTIAL);
+
+      fail("Did not throw on bad file");
+    } catch (NativeIOException nioe) {
+      assertEquals(Errno.EBADF, nioe.getErrno());
+    }
+    
+    try {
+      NativeIO.posix_fadvise(null, 0, 1024,
+                             NativeIO.POSIX_FADV_SEQUENTIAL);
+
+      fail("Did not throw on null file");
+    } catch (NullPointerException npe) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testSyncFileRange() throws Exception {
+    FileOutputStream fos = new FileOutputStream(
+      new File(TEST_DIR, "testSyncFileRange"));
+    try {
+      fos.write("foo".getBytes());
+      NativeIO.sync_file_range(fos.getFD(), 0, 1024,
+                               NativeIO.SYNC_FILE_RANGE_WRITE);
+      // no way to verify that this actually has synced,
+      // but if it doesn't throw, we can assume it worked
+    } catch (UnsupportedOperationException uoe) {
+      // we should just skip the unit test on machines where we don't
+      // have fadvise support
+      assumeTrue(false);
+    } finally {
+      fos.close();
+    }
+    try {
+      NativeIO.sync_file_range(fos.getFD(), 0, 1024,
+                               NativeIO.SYNC_FILE_RANGE_WRITE);
+      fail("Did not throw on bad file");
+    } catch (NativeIOException nioe) {
+      assertEquals(Errno.EBADF, nioe.getErrno());
+    }
+  }
+
   private void assertPermissions(File f, int expected) throws IOException {
     FileSystem localfs = FileSystem.getLocal(new Configuration());
     FsPermission perms = localfs.getFileStatus(

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestRPC.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestRPC.java Fri Jun 21 06:37:27 2013
@@ -18,31 +18,38 @@
 
 package org.apache.hadoop.ipc;
 
-import org.apache.hadoop.metrics2.MetricsSource;
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.lang.reflect.Method;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
-import java.lang.reflect.Method;
-
-import junit.framework.TestCase;
-
 import java.util.Arrays;
 
-import org.apache.commons.logging.*;
+import junit.framework.TestCase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC.VersionMismatch;
 import org.apache.hadoop.ipc.metrics.RpcInstrumentation;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.Service;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
-import org.apache.hadoop.security.AccessControlException;
-import static org.apache.hadoop.test.MetricsAsserts.*;
 
 /** Unit tests for RPC. */
 public class TestRPC extends TestCase {
@@ -133,6 +140,14 @@ public class TestRPC extends TestCase {
     }
   }
 
+  public static class TestVersionMismatchImpl extends TestImpl {
+    /** @return a different version. */
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion) {
+      return super.getProtocolVersion(protocol, clientVersion) + 1;
+    }
+  }
+
   //
   // an object that does a bunch of transactions
   //
@@ -398,6 +413,50 @@ public class TestRPC extends TestCase {
       assertCounter("rpcAuthenticationSuccesses", 0, rb);
     }
   }
+
+  /**
+   * Count the number of threads that have a stack frame containing
+   * the given string
+   */
+  private static int countThreads(String search) {
+    ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+
+    int count = 0;
+    ThreadInfo[] infos = threadBean.getThreadInfo(threadBean.getAllThreadIds(), 20);
+    for (ThreadInfo info : infos) {
+      if (info == null) continue;
+      for (StackTraceElement elem : info.getStackTrace()) {
+        if (elem.getClassName().contains(search)) {
+          count++;
+          break;
+        }
+      }
+    }
+    return count;
+  }
+
+
+  /**
+   * Test that server.stop() properly stops all threads
+   */
+  public void testStopsAllThreads() throws Exception {
+    int threadsBefore = countThreads("Server$Listener$Reader");
+    assertEquals("Expect no Reader threads running before test",
+      0, threadsBefore);
+
+    final Server server = RPC.getServer(new TestImpl(), ADDRESS,
+        0, 5, true, conf);
+    server.start();
+    try {
+      int threadsRunning = countThreads("Server$Listener$Reader");
+      assertTrue(threadsRunning > 0);
+    } finally {
+      server.stop();
+    }
+    int threadsAfter = countThreads("Server$Listener$Reader");
+    assertEquals("Expect no Reader threads left running after test",
+      0, threadsAfter);
+  }
   
   public void testAuthorization() throws Exception {
     Configuration conf = new Configuration();
@@ -480,6 +539,35 @@ public class TestRPC extends TestCase {
     }
     assertTrue(succeeded);
   }
+
+  /** Test RPC.checkVersion method. */
+  public void testCheckVersion() throws Exception {
+    Server server = RPC.getServer(new TestVersionMismatchImpl(), ADDRESS, 0, conf);
+    TestProtocol proxy = null;
+    try {
+      server.start();
+    
+      InetSocketAddress addr = NetUtils.getConnectAddress(server);
+      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+      
+      // get proxy should succeed
+      proxy = (TestProtocol)RPC.getProxy(
+          TestProtocol.class, TestProtocol.versionID, addr, ugi, conf,
+          NetUtils.getSocketFactory(conf, TestProtocol.class), 0, null, false);  
+
+      try {
+        RPC.checkVersion(TestProtocol.class, TestProtocol.versionID, proxy);
+        fail("Check version should throw VersionMismatch");
+      } catch(VersionMismatch vm) {
+        LOG.info("The VersionMismatch is expected", vm);
+      }
+    } finally {
+      server.stop();
+      if (proxy!=null) {
+        RPC.stopProxy(proxy);
+      }
+    }
+  }
  
   public static void main(String[] args) throws Exception {
 

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestSaslRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestSaslRPC.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestSaslRPC.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestSaslRPC.java Fri Jun 21 06:37:27 2013
@@ -366,7 +366,10 @@ public class TestSaslRPC {
     server.start();
 
     final UserGroupInformation current = UserGroupInformation.getCurrentUser();
-    final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    // don't use what the rpc server claims it's bound to since it's the
+    // client's responsibility to set the service
+    final InetSocketAddress addr = NetUtils.createSocketAddr(
+        ADDRESS, server.getListenerAddress().getPort());
     TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current
         .getUserName()));
     Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId,

Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestServer.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestServer.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestServer.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+/**
+ * This is intended to be a set of unit tests for the 
+ * org.apache.hadoop.ipc.Server class.
+ */
+public class TestServer {
+
+  @Test
+  public void testExceptionsHandler() throws IOException {
+    Server.ExceptionsHandler handler = new Server.ExceptionsHandler();
+    handler.addTerseExceptions(IOException.class);
+    handler.addTerseExceptions(RemoteException.class);
+
+    assertTrue(handler.isTerse(IOException.class));
+    assertTrue(handler.isTerse(RemoteException.class));
+  }
+}

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClientCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClientCluster.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClientCluster.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClientCluster.java Fri Jun 21 06:37:27 2013
@@ -31,6 +31,11 @@ public interface MiniMRClientCluster {
 
   public void start() throws IOException;
 
+  /**
+   * Stop and start back the cluster using the same configuration.
+   */
+  public void restart() throws IOException;
+
   public void stop() throws IOException;
 
   public Configuration getConfig() throws IOException;

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Fri Jun 21 06:37:27 2013
@@ -56,7 +56,7 @@ public class MiniMRCluster {
     
   private String namenode;
   private UserGroupInformation ugi = null;
-  private JobConf conf;
+  protected JobConf conf;
   private int numTrackerToExclude;
     
   private JobConf job;
@@ -100,8 +100,10 @@ public class MiniMRCluster {
     public void run() {
       try {
         jc = (jc == null) ? createJobConf() : createJobConf(jc);
-        File f = new File("build/test/mapred/local").getAbsoluteFile();
-        jc.set("mapred.local.dir",f.getAbsolutePath());
+        String localPath = System.getProperty("test.build.data",
+            "build/test/mapred/local");
+        File f = new File(localPath).getAbsoluteFile();
+        jc.set("mapred.local.dir", f.getAbsolutePath());
         jc.setClass("topology.node.switch.mapping.impl", 
             StaticMapping.class, DNSToSwitchMapping.class);
         final String id =
@@ -333,7 +335,8 @@ public class MiniMRCluster {
   private void waitTaskTrackers() {
     for(Iterator<TaskTrackerRunner> itr= taskTrackerList.iterator(); itr.hasNext();) {
       TaskTrackerRunner runner = itr.next();
-      while (!runner.isDead && (!runner.isInitialized || !runner.tt.isIdle())) {
+      while (!runner.isDead && (!runner.isInitialized
+        ||  !runner.tt.isIdleAndClean())) {
         if (!runner.isInitialized) {
           LOG.info("Waiting for task tracker to start.");
         } else {

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClusterAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClusterAdapter.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClusterAdapter.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClusterAdapter.java Fri Jun 21 06:37:27 2013
@@ -20,7 +20,11 @@ package org.apache.hadoop.mapred;
 
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 
 /**
  * An adapter for MiniMRCluster providing a MiniMRClientCluster interface. This
@@ -30,6 +34,8 @@ public class MiniMRClusterAdapter implem
 
   private MiniMRCluster miniMRCluster;
 
+  private static final Log LOG = LogFactory.getLog(MiniMRClusterAdapter.class);
+
   public MiniMRClusterAdapter(MiniMRCluster miniMRCluster) {
     this.miniMRCluster = miniMRCluster;
   }
@@ -50,4 +56,37 @@ public class MiniMRClusterAdapter implem
     miniMRCluster.shutdown();
   }
 
+  @Override
+  public void restart() throws IOException {
+    if (!miniMRCluster.getJobTrackerRunner().isActive()) {
+      LOG.warn("Cannot restart the mini cluster, start it first");
+      return;
+    }
+
+    int jobTrackerPort = miniMRCluster.getJobTrackerPort();
+    int taskTrackerPort = getConfig().getInt(
+        "mapred.task.tracker.report.address", 0);
+    int numtaskTrackers = miniMRCluster.getNumTaskTrackers();
+    String namenode = getConfig().get(FileSystem.FS_DEFAULT_NAME_KEY);
+
+    stop();
+    
+    // keep retrying to start the cluster for a max. of 30 sec.
+    for (int i = 0; i < 30; i++) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+      try {
+        miniMRCluster = new MiniMRCluster(jobTrackerPort, taskTrackerPort,
+            numtaskTrackers, namenode, 1);
+        break;
+      } catch (Exception e) {
+        LOG.info("Retrying to start the cluster");
+      }
+    }
+
+  }
+
 }

Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClusterWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClusterWithNodeGroup.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClusterWithNodeGroup.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClusterWithNodeGroup.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.StaticMapping;
+
+public class MiniMRClusterWithNodeGroup extends MiniMRCluster {
+
+  private String[] nodeGroups;
+
+  public MiniMRClusterWithNodeGroup(int numTaskTrackers, String namenode, int numDir, 
+      String[] racks, String[] nodeGroups, String[] hosts, JobConf conf) throws IOException {
+    super(numTaskTrackers, namenode, numDir, racks, hosts, conf);
+    this.nodeGroups = nodeGroups;
+  }
+
+  /**
+   * Start the tasktracker.
+   */
+  @Override
+  public void startTaskTracker(String host, String rack,
+      int idx, int numDir) throws IOException {
+    if (rack != null && nodeGroups != null) {
+      StaticMapping.addNodeToRack(host, rack + nodeGroups);
+    }
+
+    if (host != null) {
+      NetUtils.addStaticResolution(host, "localhost");
+    }
+
+    TaskTrackerRunner taskTracker;
+    taskTracker = new TaskTrackerRunner(idx, numDir, host, conf);
+
+    addTaskTracker(taskTracker);
+  }
+
+}

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/NotificationTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/NotificationTestCase.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/NotificationTestCase.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/NotificationTestCase.java Fri Jun 21 06:37:27 2013
@@ -95,7 +95,7 @@ public abstract class NotificationTestCa
   }
 
   public static class NotificationServlet extends HttpServlet {
-    public static int counter = 0;
+    public static volatile int counter = 0;
 
     protected void doGet(HttpServletRequest req, HttpServletResponse res)
       throws ServletException, IOException {
@@ -159,9 +159,10 @@ public abstract class NotificationTestCa
   public void testMR() throws Exception {
     System.out.println(launchWordCount(this.createJobConf(),
                                        "a b c d e f g h", 1, 1));
-    synchronized(Thread.currentThread()) {
-      stdPrintln("Sleeping for 2 seconds to give time for retry");
-      Thread.currentThread().sleep(2000);
+    boolean keepTrying = true;
+    for (int tries = 0; tries < 30 && keepTrying; tries++) {
+      Thread.sleep(50);
+      keepTrying = !(NotificationServlet.counter == 2);
     }
     assertEquals(2, NotificationServlet.counter);
     
@@ -179,18 +180,20 @@ public abstract class NotificationTestCa
     // run a job with KILLED status
     System.out.println(UtilsForTests.runJobKill(this.createJobConf(), inDir,
                                                 outDir).getID());
-    synchronized(Thread.currentThread()) {
-      stdPrintln("Sleeping for 2 seconds to give time for retry");
-      Thread.currentThread().sleep(2000);
+    keepTrying = true;
+    for (int tries = 0; tries < 30 && keepTrying; tries++) {
+      Thread.sleep(50);
+      keepTrying = !(NotificationServlet.counter == 4);
     }
     assertEquals(4, NotificationServlet.counter);
     
     // run a job with FAILED status
     System.out.println(UtilsForTests.runJobFail(this.createJobConf(), inDir,
                                                 outDir).getID());
-    synchronized(Thread.currentThread()) {
-      stdPrintln("Sleeping for 2 seconds to give time for retry");
-      Thread.currentThread().sleep(2000);
+    keepTrying = true;
+    for (int tries = 0; tries < 30 && keepTrying; tries++) {
+      Thread.sleep(50);
+      keepTrying = !(NotificationServlet.counter == 6);
     }
     assertEquals(6, NotificationServlet.counter);
   }

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestAuditLogger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestAuditLogger.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestAuditLogger.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestAuditLogger.java Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+
 package org.apache.hadoop.mapred;
 
 import java.net.InetAddress;

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestClusterStatus.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestClusterStatus.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestClusterStatus.java Fri Jun 21 06:37:27 2013
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.mapred;
 
+import static org.junit.Assert.assertEquals;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -24,14 +26,11 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.mapreduce.ClusterMetrics;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-
-import junit.extensions.TestSetup;
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
 /**
  * Class to test that ClusterMetrics are being created with the right
@@ -39,41 +38,44 @@ import junit.framework.TestSuite;
  * 
  * The tests exercise code paths where the counts of slots are updated.
  */
-public class TestClusterStatus extends TestCase {
+public class TestClusterStatus {
 
   private static String[] trackers = new String[] { "tracker_tracker1:1000",
       "tracker_tracker2:1000", "tracker_tracker3:1000" };
-  private static JobTracker jobTracker;
-  private static int mapSlotsPerTracker = 4;
-  private static int reduceSlotsPerTracker = 2;
-  private static MiniMRCluster mr;
-  private static JobClient client;
+  private JobTracker jobTracker;
+  private final static int mapSlotsPerTracker = 4;
+  private final static int reduceSlotsPerTracker = 2;
+  private MiniMRCluster mr;
+  private JobClient client;
   // heartbeat responseId. increment this after sending a heartbeat
-  private static short responseId = 1;
+  private short responseId;
   private static FakeJobInProgress fakeJob;
   private static FakeTaskScheduler scheduler;
   
-  public static Test suite() {
-    TestSetup setup = new TestSetup(new TestSuite(TestClusterStatus.class)) {
-      protected void setUp() throws Exception {
-        JobConf conf = new JobConf();
-        conf.setClass("mapred.jobtracker.taskScheduler", 
-            TestClusterStatus.FakeTaskScheduler.class,
-                  TaskScheduler.class);
-        mr = new MiniMRCluster(0, 0, 0, "file:///", 1, null, null, null, conf);
-        jobTracker = mr.getJobTrackerRunner().getJobTracker();
-        for (String tracker : trackers) {
-          establishFirstContact(jobTracker, tracker);
-        }
-        client = new JobClient(mr.createJobConf());
-      }
-
-      protected void tearDown() throws Exception {
-        client.close();
-        mr.shutdown();
-      }
-    };
-    return setup;
+  @Before
+  public void setUp() throws Exception {
+    responseId = 1;
+    JobConf conf = new JobConf();
+    conf.setClass("mapred.jobtracker.taskScheduler", 
+        TestClusterStatus.FakeTaskScheduler.class,
+        TaskScheduler.class);
+    mr = new MiniMRCluster(0, 0, 0, "file:///", 1, null, null, null, conf);
+    jobTracker = mr.getJobTrackerRunner().getJobTracker();
+    for (String tracker : trackers) {
+      establishFirstContact(jobTracker, tracker);
+    }
+    client = new JobClient(mr.createJobConf());
+  }
+  
+  @After
+  public void tearDown() throws Exception {
+    client.close();
+    mr.shutdown();
+    fakeJob = null;
+    scheduler = null;
+    client = null;
+    mr = null;
+    jobTracker = null;
   }
 
   /**
@@ -157,6 +159,7 @@ public class TestClusterStatus extends T
       taskStatuses, 0, 0, mapSlotsPerTracker, reduceSlotsPerTracker);
   }
   
+  @Test
   public void testClusterMetrics() throws IOException, InterruptedException {
     assertEquals("tasktracker count doesn't match", trackers.length,
       client.getClusterStatus().getTaskTrackers());
@@ -256,6 +259,7 @@ public class TestClusterStatus extends T
     list.add(ts);
   }
 
+  @Test
   public void testReservedSlots() throws IOException {
     JobConf conf = mr.createJobConf();
 
@@ -308,6 +312,7 @@ public class TestClusterStatus extends T
         0, metrics.getReservedReduceSlots());
   }
   
+  @Test
   public void testClusterStatus() throws Exception {
     ClusterStatus clusterStatus = client.getClusterStatus();
     assertEquals("JobTracker used-memory is " + clusterStatus.getUsedMemory() + 

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestIFileStreams.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestIFileStreams.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestIFileStreams.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestIFileStreams.java Fri Jun 21 06:37:27 2013
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapred;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -35,7 +36,7 @@ public class TestIFileStreams extends Te
     ifos.close();
     DataInputBuffer dib = new DataInputBuffer();
     dib.reset(dob.getData(), DLEN + 4);
-    IFileInputStream ifis = new IFileInputStream(dib, 104);
+    IFileInputStream ifis = new IFileInputStream(dib, 104, new Configuration());
     for (int i = 0; i < DLEN; ++i) {
       assertEquals(i, ifis.read());
     }
@@ -54,7 +55,7 @@ public class TestIFileStreams extends Te
     final byte[] b = dob.getData();
     ++b[17];
     dib.reset(b, DLEN + 4);
-    IFileInputStream ifis = new IFileInputStream(dib, 104);
+    IFileInputStream ifis = new IFileInputStream(dib, 104, new Configuration());
     int i = 0;
     try {
       while (i < DLEN) {
@@ -83,7 +84,7 @@ public class TestIFileStreams extends Te
     ifos.close();
     DataInputBuffer dib = new DataInputBuffer();
     dib.reset(dob.getData(), DLEN + 4);
-    IFileInputStream ifis = new IFileInputStream(dib, 100);
+    IFileInputStream ifis = new IFileInputStream(dib, 100, new Configuration());
     int i = 0;
     try {
       while (i < DLEN - 8) {

Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobClientRetries.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobClientRetries.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobClientRetries.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TestMiniMRWithDFS.TestResult;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestJobClientRetries {
+  
+  private static final Log LOG = LogFactory.getLog(TestJobClientRetries.class);
+  
+  MiniMRCluster mr;
+  
+  @Test
+  public void testJobSubmission() throws Exception {
+    
+    // Start MR cluster
+    mr = new MiniMRCluster(2, "file:///", 3);
+    
+    final List<Exception> exceptions = new ArrayList<Exception>();
+
+    // Get jobConf
+    final JobConf jobConf = mr.createJobConf();
+    
+    // Stop JobTracker
+    LOG.info("Stopping JobTracker");
+    mr.stopJobTracker();
+    
+    /*
+     * Submit job *after* setting job-client retries to be *on*...
+     * the test *should* fail without this config being set
+     */
+    LOG.info("Stopping JobTracker");
+    jobConf.setBoolean(
+        JobClient.MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY, true);
+    WordCountThread wc = new WordCountThread(jobConf, exceptions);
+    wc.start();
+    
+    // Restart JobTracker after a little while
+    Thread.sleep(5000);
+    LOG.info("Re-starting JobTracker for job-submission to go through");
+    mr.startJobTracker();
+    
+    // Wait for the job to complete or for an exception to occur
+    LOG.info("Waiting for job success/failure ...");
+    wc.join();
+
+    Assert.assertNotNull(wc.result);
+    Assert.assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
+        "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", wc.result.output);
+    Assert.assertTrue("exceptions is not empty: " + exceptions, exceptions.isEmpty());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    mr.shutdown();
+  }
+  
+  public static class WordCountThread extends Thread {
+    JobConf jobConf;
+    List<Exception> exceptions;
+    TestResult result;
+    
+    public WordCountThread(JobConf jobConf, List<Exception> exceptions) {
+      super(WordCountThread.class.getName());
+      this.jobConf = jobConf;
+      this.exceptions = exceptions;
+    }
+
+    @Override
+    public void run() {
+      try {
+        FileSystem fs = FileSystem.getLocal(jobConf);
+        Path testdir = new Path(
+            System.getProperty("test.build.data","/tmp")).makeQualified(fs);
+        final Path inDir = new Path(testdir, "input");
+        final Path outDir = new Path(testdir, "output");
+        String input = "The quick brown fox\nhas many silly\nred fox sox\n";
+        LOG.info("Starting word-count");
+        result = 
+            TestMiniMRWithDFS.launchWordCount(
+                jobConf, inDir, outDir, input, 3, 1);
+        LOG.info("Finished word-count");
+      } catch (Exception e) {
+        LOG.error("Caught exception during word-count", e);
+        exceptions.add(e);
+        result = null;
+      }
+    }
+  }
+}

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistory.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistory.java Fri Jun 21 06:37:27 2013
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.text.ParseException;
 import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.List;
 import java.util.HashMap;
 import java.util.Map;
@@ -34,6 +35,7 @@ import junit.framework.TestCase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -46,6 +48,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.util.Shell;
 
 /**
  * Tests the JobHistory files - to catch any changes to JobHistory that can
@@ -381,6 +384,23 @@ public class TestJobHistory extends Test
                    (status.equals("SUCCESS") || status.equals("FAILED") ||
                     status.equals("KILLED")));
 
+        // Validate task Avataar
+        String avataar = attempt.get(Keys.AVATAAR);
+        assertTrue("Unexpected LOCALITY \"" + avataar + "\" is seen in " +
+            " history file for task attempt " + id,
+            (avataar.equals("VIRGIN") || avataar.equals("SPECULATIVE"))
+        );
+        
+        // Map Task Attempts should have valid LOCALITY
+        if (type.equals("MAP")) {
+          String locality = attempt.get(Keys.LOCALITY);
+          assertTrue("Unexpected LOCALITY \"" + locality + "\" is seen in " +
+              " history file for task attempt " + id,
+              (locality.equals("NODE_LOCAL") || locality.equals("GROUP_LOCAL") ||
+                  locality.equals("RACK_LOCAL") || locality.equals("OFF_SWITCH"))
+          );
+        }
+ 
         // Reduce Task Attempts should have valid SHUFFLE_FINISHED time and
         // SORT_FINISHED time
         if (type.equals("REDUCE") && status.equals("SUCCESS")) {
@@ -823,10 +843,33 @@ public class TestJobHistory extends Test
     
     // Validate the job queue name
     assertTrue(jobInfo.getJobQueue().equals(conf.getQueueName()));
+
+    // Validate the workflow properties
+    assertTrue(jobInfo.get(Keys.WORKFLOW_ID).equals(
+        conf.get(JobConf.WORKFLOW_ID, "")));
+    assertTrue(jobInfo.get(Keys.WORKFLOW_NAME).equals(
+        conf.get(JobConf.WORKFLOW_NAME, "")));
+    assertTrue(jobInfo.get(Keys.WORKFLOW_NODE_NAME).equals(
+        conf.get(JobConf.WORKFLOW_NODE_NAME, "")));
+    assertTrue(jobInfo.get(Keys.WORKFLOW_ADJACENCIES).equals(
+        JobHistory.JobInfo.getWorkflowAdjacencies(conf)));
+    assertTrue(jobInfo.get(Keys.WORKFLOW_TAGS).equals(
+        conf.get(JobConf.WORKFLOW_TAGS, "")));
+  }
+
+  public void testDoneFolderOnHDFS() throws IOException, InterruptedException {
+    runDoneFolderTest("history_done");
+  }
+
+  public void testDoneFolderNotOnDefaultFileSystem() throws IOException,
+      InterruptedException {
+    runDoneFolderTest("file:///" + System.getProperty("test.build.data", "tmp")
+        + "/history_done");
   }
 
-  public void testDoneFolderOnHDFS() throws IOException {
+  private void runDoneFolderTest(String doneFolder) throws IOException, InterruptedException {
     MiniMRCluster mr = null;
+    MiniDFSCluster dfsCluster = null;
     try {
       JobConf conf = new JobConf();
       // keep for less time
@@ -834,10 +877,9 @@ public class TestJobHistory extends Test
       conf.setLong("mapred.jobtracker.retirejob.interval", 100000);
 
       //set the done folder location
-      String doneFolder = "history_done";
       conf.set("mapred.job.tracker.history.completed.location", doneFolder);
 
-      MiniDFSCluster dfsCluster = new MiniDFSCluster(conf, 2, true, null);
+      dfsCluster = new MiniDFSCluster(conf, 2, true, null);
       mr = new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(),
           3, null, null, conf);
 
@@ -863,7 +905,7 @@ public class TestJobHistory extends Test
       
       Path doneDir = JobHistory.getCompletedJobHistoryLocation();
       assertEquals("History DONE folder not correct", 
-          doneFolder, doneDir.getName());
+          new Path(doneFolder).getName(), doneDir.getName());
       JobID id = job.getID();
       String logFileName = getDoneFile(conf, id, doneDir);
       assertNotNull(logFileName);
@@ -894,28 +936,33 @@ public class TestJobHistory extends Test
       // Test that all of the ancestors of the log file have the same
       //   permissions as the done directory
       
-      Path cursor = logFile.getParent();
-
-      Path doneParent = doneDir.getParent();
-
-      FsPermission donePermission = getStatus(fileSys, doneDir).getPermission();
-
-      System.err.println("testDoneFolderOnHDFS: done dir permission = "
-                         + donePermission);
+      // The folders between the done folder and the folder containing the
+      // log file are created automatically. Since the default permission
+      // on Windows may not be the same as JobHistory.HISTORY_DIR_PERMISSION
+      // so we skip this check if the file system is local file system
+      // and is windows
+      if (!(fileSys instanceof LocalFileSystem && Shell.WINDOWS)) {
+        Path cursor = logFile.getParent();
+
+        Path doneParent = doneDir.getParent();
+
+        FsPermission donePermission = getStatus(fileSys, doneDir)
+            .getPermission();
+
+        System.err.println("testDoneFolderOnHDFS: done dir permission = "
+            + donePermission);
+
+        while (!cursor.equals(doneParent)) {
+          FileStatus cursorStatus = getStatus(fileSys, cursor);
+          FsPermission cursorPermission = cursorStatus.getPermission();
+
+          assertEquals("testDoneFolder: A done directory descendant, " + cursor
+              + " does not have the same permisison as the done directory, "
+              + doneDir, donePermission, cursorPermission);
 
-      while (!cursor.equals(doneParent)) {
-        FileStatus cursorStatus = getStatus(fileSys, cursor);
-        FsPermission cursorPermission = cursorStatus.getPermission();
-
-        assertEquals("testDoneFolderOnHDFS: A done directory descendant, "
-                     + cursor
-                     + " does not have the same permisison as the done directory, "
-                     + doneDir,
-                     donePermission,
-                     cursorPermission);
-
-        cursor = cursor.getParent();
-      }      
+          cursor = cursor.getParent();
+        }
+      }
 
       // check if the job file is removed from the history location 
       Path runningJobsHistoryFolder = logFile.getParent().getParent();
@@ -937,6 +984,10 @@ public class TestJobHistory extends Test
         cleanupLocalFiles(mr);
         mr.shutdown();
       }
+
+      if (dfsCluster != null) {
+        dfsCluster.shutdown();
+      }
     }
   }
 
@@ -980,7 +1031,21 @@ public class TestJobHistory extends Test
       // no queue admins for default queue
       conf.set(QueueManager.toFullPropertyName(
           "default", QueueACL.ADMINISTER_JOBS.getAclName()), " ");
-      
+
+      // set workflow properties
+      conf.set(JobConf.WORKFLOW_ID, "workflowId1");
+      conf.set(JobConf.WORKFLOW_NAME, "workflowName1");
+      String workflowNodeName = "A";
+      conf.set(JobConf.WORKFLOW_NODE_NAME, workflowNodeName);
+      conf.set(JobConf.WORKFLOW_ADJACENCY_PREFIX_STRING + workflowNodeName,
+          "BC");
+      conf.set(JobConf.WORKFLOW_ADJACENCY_PREFIX_STRING + workflowNodeName,
+          "DEF");
+      conf.set(JobConf.WORKFLOW_ADJACENCY_PREFIX_STRING + "DEF", "G");
+      conf.set(JobConf.WORKFLOW_ADJACENCY_PREFIX_STRING + "Z",
+          workflowNodeName);
+      conf.set(JobConf.WORKFLOW_TAGS, "tag1,tag2");
+
       mr = new MiniMRCluster(2, "file:///", 3, null, null, conf);
 
       // run the TCs
@@ -1292,4 +1357,47 @@ public class TestJobHistory extends Test
       }
     }
   }
+  
+  public void testJobHistoryCleaner() throws Exception {
+    JobConf conf = new JobConf();
+    FileSystem fs = FileSystem.get(conf);
+    JobHistory.DONEDIR_FS = fs;
+    JobHistory.DONE = new Path(TEST_ROOT_DIR + "/done");
+    Path histDirOld = new Path(JobHistory.DONE, "version-1/jtinstid/2013/02/05/000000/");
+    Path histDirOnLine = new Path(JobHistory.DONE, "version-1/jtinstid/2013/02/06/000000/");
+    final int dayMillis = 1000 * 60 * 60 * 24;
+
+    try {
+      Calendar runTime = Calendar.getInstance();
+      runTime.clear();
+      runTime.set(2013, 1, 8, 12, 0);
+      long runTimeMillis = runTime.getTimeInMillis();
+      
+      fs.mkdirs(histDirOld);
+      fs.mkdirs(histDirOnLine);
+      Path histFileOldDir = new Path(histDirOld, "jobfile1.txt");
+      Path histFileOnLineDir = new Path(histDirOnLine, "jobfile1.txt");
+      Path histFileDontDelete = new Path(histDirOnLine, "jobfile2.txt");
+      fs.create(histFileOldDir).close();
+      fs.create(histFileOnLineDir).close();
+      fs.create(histFileDontDelete).close();
+      new File(histFileOnLineDir.toUri()).setLastModified(
+          runTimeMillis - dayMillis * 5 / 2);
+      new File(histFileDontDelete.toUri()).setLastModified(
+          runTimeMillis - dayMillis * 3 / 2);
+      
+      HistoryCleaner.maxAgeOfHistoryFiles = dayMillis * 2; // two days
+      HistoryCleaner historyCleaner = new HistoryCleaner();
+      
+      historyCleaner.clean(runTimeMillis);
+      
+      assertFalse(fs.exists(histDirOld));
+      assertTrue(fs.exists(histDirOnLine));
+      assertFalse(fs.exists(histFileOldDir));
+      assertFalse(fs.exists(histFileOnLineDir));
+      assertTrue(fs.exists(histFileDontDelete));
+    } finally {
+      fs.delete(JobHistory.DONE, true);
+    }
+  }
 }

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistoryConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistoryConfig.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistoryConfig.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistoryConfig.java Fri Jun 21 06:37:27 2013
@@ -17,8 +17,10 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.util.Date;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.text.SimpleDateFormat;
 
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.LongWritable;
@@ -115,7 +117,8 @@ public class TestJobHistoryConfig extend
       conf.setQueueName("default");
       String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
           "/tmp")).toString().replace(' ', '+');
-      JobTracker jt = JobTracker.startTracker(conf);
+      String uniqid = new SimpleDateFormat("yyyyMMddHHmm").format(new Date());
+      JobTracker jt = JobTracker.startTracker(conf, uniqid, true);
       assertTrue(jt != null);
       JobInProgress jip = new JobInProgress(new JobID("jt", 1),
           new JobConf(conf), jt);
@@ -140,8 +143,9 @@ public class TestJobHistoryConfig extend
   private boolean canStartJobTracker(JobConf conf) throws InterruptedException,
       IOException {
     JobTracker jt = null;
+    String uniqid = new SimpleDateFormat("yyyyMMddHHmm").format(new Date());
     try {
-      jt = JobTracker.startTracker(conf);
+      jt = JobTracker.startTracker(conf, uniqid, true);
       Log.info("Started JobTracker");
     } catch (IOException e) {
       Log.info("Can not Start JobTracker", e.getLocalizedMessage());

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistoryServer.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistoryServer.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistoryServer.java Fri Jun 21 06:37:27 2013
@@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.methods.GetMethod;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.fs.Path;
@@ -34,14 +33,13 @@ import org.junit.Assert;
 import junit.framework.TestCase;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.net.HttpURLConnection;
-import java.net.URL;
-import java.net.MalformedURLException;
 
 public class TestJobHistoryServer extends TestCase {
   private static final Log LOG = LogFactory.getLog(TestJobHistoryServer.class);
-
+  private String inputPath = System.getProperty("test.build.data",
+      "build/test/data") + "/TestJobHistoryServer";
+  
   public void testHistoryServerEmbedded() {
 
     MiniMRCluster mrCluster = null;
@@ -61,7 +59,7 @@ public class TestJobHistoryServer extend
       LOG.info("******** History Address: " + historyAddress);
 
       conf = mrCluster.createJobConf();
-      createInputFile(conf, "/tmp/input");
+      createInputFile(conf, inputPath);
 
       RunningJob job = runJob(conf);
       LOG.info("Job details: " + job);
@@ -72,6 +70,9 @@ public class TestJobHistoryServer extend
     } catch (IOException e) {
       LOG.error("Failure running test", e);
       Assert.fail(e.getMessage());
+    } catch (InterruptedException e) {
+      LOG.error("Exit due to being interrupted");
+      Assert.fail(e.getMessage());
     } finally {
       if (mrCluster != null) mrCluster.shutdown();
     }
@@ -100,7 +101,7 @@ public class TestJobHistoryServer extend
       LOG.info("******** History Address: " + historyAddress);
 
       conf = mrCluster.createJobConf();
-      createInputFile(conf, "/tmp/input");
+      createInputFile(conf, inputPath);
 
       RunningJob job = runJob(conf);
       LOG.info("Job details: " + job);
@@ -111,6 +112,9 @@ public class TestJobHistoryServer extend
     } catch (IOException e) {
       LOG.error("Failure running test", e);
       Assert.fail(e.getMessage());
+    } catch (InterruptedException e) {
+      LOG.error("Exit due to being interrupted");
+      Assert.fail(e.getMessage());
     } finally {
       if (mrCluster != null) mrCluster.shutdown();
       try {
@@ -145,17 +149,33 @@ public class TestJobHistoryServer extend
     conf.setMapperClass(org.apache.hadoop.mapred.lib.IdentityMapper.class);
     conf.setReducerClass(org.apache.hadoop.mapred.lib.IdentityReducer.class);
 
-    FileInputFormat.setInputPaths(conf, "/tmp/input");
+    FileInputFormat.setInputPaths(conf, inputPath);
 
     return JobClient.runJob(conf);
   }
 
-  private String getRedirectUrl(String jobUrl) throws IOException {
+  private String getRedirectUrl(String jobUrl) throws IOException, InterruptedException {
     HttpClient client = new HttpClient();
     GetMethod method = new GetMethod(jobUrl);
     method.setFollowRedirects(false);
     try {
       int status = client.executeMethod(method);
+      if(status!=HttpURLConnection.HTTP_MOVED_TEMP) {
+        int retryTimes = 4;
+        for(int i = 1; i < retryTimes + 1; i++) {
+          try {
+            // Wait i sec
+            Thread.sleep(i * 1000);
+          } catch (InterruptedException e) {
+            throw new InterruptedException("Exit due to being interrupted");
+          }
+          // Get the latest status
+          status = client.executeMethod(method);
+          if(status == HttpURLConnection.HTTP_MOVED_TEMP)
+            break;
+        }
+      }
+
       Assert.assertEquals(status, HttpURLConnection.HTTP_MOVED_TEMP);
 
       LOG.info("Location: " + method.getResponseHeader("Location"));

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobInProgress.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobInProgress.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobInProgress.java Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+
 package org.apache.hadoop.mapred;
 
 import java.io.DataOutputStream;
@@ -10,7 +25,6 @@ import java.util.HashSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.RandomWriter;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -20,17 +34,35 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.UtilsForTests;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.net.StaticMapping;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.*;
+import org.junit.Test;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
 
-import junit.framework.TestCase;
-
-public class TestJobInProgress extends TestCase {
+public class TestJobInProgress {
   static final Log LOG = LogFactory.getLog(TestJobInProgress.class);
 
-  private MiniMRCluster mrCluster;
+  private static MiniMRCluster mrCluster;
+
+  private static MiniDFSCluster dfsCluster;
+  private static JobTracker jt;
+
+  static final String trackers[] = new String[] {
+      "tracker_tracker1.r1.com:1000", "tracker_tracker2.r1.com:1000",
+      "tracker_tracker3.r2.com:1000", "tracker_tracker4.r3.com:1000" };
+
+  static final String[] hosts = new String[] { "tracker1.r1.com",
+      "tracker2.r1.com", "tracker3.r2.com", "tracker4.r3.com" };
+
+  static final String[] racks = new String[] { "/r1", "/r1", "/r2", "/r3" };
 
-  private MiniDFSCluster dfsCluster;
-  JobTracker jt;
   private static Path TEST_DIR =
     new Path("/tmp/TestJobInProgress", "jip-testing");
   private static int numSlaves = 4;
@@ -72,22 +104,43 @@ public class TestJobInProgress extends T
 
   }
 
-  @Override
-  protected void setUp() throws Exception {
-    // TODO Auto-generated method stub
-    super.setUp();
+  @BeforeClass
+  public static void setUp() throws Exception {
     Configuration conf = new Configuration();
+    conf.set("mapreduce.jobtracker.address", "localhost:0");
+    conf.set("mapreduce.jobtracker.http.address", "0.0.0.0:0");
+    conf.setClass("topology.node.switch.mapping.impl", StaticMapping.class,
+        DNSToSwitchMapping.class);
     dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
     mrCluster = new MiniMRCluster(numSlaves, dfsCluster.getFileSystem()
         .getUri().toString(), 1);
     jt = mrCluster.getJobTrackerRunner().getJobTracker();
+    // Set up the Topology Information
+    for (int i = 0; i < hosts.length; i++) {
+      StaticMapping.addNodeToRack(hosts[i], racks[i]);
+    }
+    for (String s : trackers) {
+      FakeObjectUtilities.establishFirstContact(jt, s);
+    }
   }
 
+  @Test
   public void testPendingMapTaskCount() throws Exception {
     launchTask(FailMapTaskJob.class, IdentityReducer.class);
     checkTaskCounts();
   }
+
+  /** 
+   * Test to ensure that the job works when slow start is used and 
+   * some tasks are allowed to fail  
+   */
+  @Test
+  public void testSlowStartAndFailurePercent() throws Exception {
+    launchTaskSlowStart(FailMapTaskJob.class, IdentityReducer.class);
+    checkTaskCounts();
+  }
   
+  @Test
   public void testPendingReduceTaskCount() throws Exception {
     launchTask(IdentityMapper.class, FailReduceTaskJob.class);
     checkTaskCounts();
@@ -113,7 +166,7 @@ public class TestJobInProgress extends T
     job.set(UtilsForTests.getTaskSignalParameter(true), mapSignalFile.toString());
     job.set(UtilsForTests.getTaskSignalParameter(false), redSignalFile.toString());
     
-    // Disable slow-start for reduces since this maps don't complete 
+    // Disable slow-start for reduces since this maps don't complete
     // in these test-cases...
     job.setFloat("mapred.reduce.slowstart.completed.maps", 0.0f);
     
@@ -174,6 +227,7 @@ public class TestJobInProgress extends T
     }
   }
   
+  @Test
   public void testRunningTaskCount() throws Exception {
     // test with spec = false and locality=true
     testRunningTaskCount(false, true);
@@ -188,13 +242,46 @@ public class TestJobInProgress extends T
     testRunningTaskCount(true, false);
   }
   
-  @Override
-  protected void tearDown() throws Exception {
+  @Test
+  public void testLocality() throws Exception {
+    NetworkTopology nt = new NetworkTopology();
+
+    Node r1n1 = new NodeBase("/default/rack1/node1");
+    nt.add(r1n1);
+    Node r1n2 = new NodeBase("/default/rack1/node2");
+    nt.add(r1n2);
+
+    Node r2n3 = new NodeBase("/default/rack2/node3");
+    nt.add(r2n3);
+
+    LOG.debug("r1n1 parent: " + r1n1.getParent() + "\n" +
+              "r1n2 parent: " + r1n2.getParent() + "\n" +
+              "r2n3 parent: " + r2n3.getParent());
+
+    // Same host
+    assertEquals(0, JobInProgress.getMatchingLevelForNodes(r1n1, r1n1, 3));
+    // Same rack
+    assertEquals(1, JobInProgress.getMatchingLevelForNodes(r1n1, r1n2, 3));
+    // Different rack
+    assertEquals(2, JobInProgress.getMatchingLevelForNodes(r1n1, r2n3, 3));
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
     mrCluster.shutdown();
     dfsCluster.shutdown();
-    super.tearDown();
   }
   
+  void launchTaskSlowStart(Class MapClass,Class ReduceClass) throws Exception{
+    JobConf job = configure(MapClass, ReduceClass, 5, 10, true);
+    // set it so no reducers start until all maps finished
+    job.setFloat("mapred.reduce.slowstart.completed.maps", 1.0f);
+    // allow all maps to fail
+    job.setInt("mapred.max.map.failures.percent", 100);
+    try {
+      JobClient.runJob(job);
+    } catch (IOException ioe) {}
+  }
 
   void launchTask(Class MapClass,Class ReduceClass) throws Exception{
     JobConf job = configure(MapClass, ReduceClass, 5, 10, true);
@@ -255,5 +342,15 @@ public class TestJobInProgress extends T
       }
     }
   }
-  
+
+  @Test
+  public void testScheduleReducesConsiderFailedMapTips() throws Exception {
+    JobInProgress jip = Mockito.mock(JobInProgress.class);
+    Mockito.when(jip.scheduleReduces()).thenCallRealMethod();
+    jip.failedMapTIPs = 10;
+    jip.finishedMapTasks = 50;
+    jip.completedMapsForReduceSlowstart = 60;
+    assertTrue("The Reduce is not scheduled", jip.scheduleReduces());
+  }
+
 }

Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobLocalizer.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobLocalizer.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobLocalizer.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+public class TestJobLocalizer {
+
+  @Test(timeout = 1000)
+  public void testConcurrentJobLocalizers() throws IOException {
+    final String LOCAL_DIR = "/tmp/mapred/local";
+    JobConf conf = new JobConf(new Configuration());
+    
+    JobLocalizer localizer1 = new JobLocalizer(conf, "user1", "jobid1",
+        LOCAL_DIR);
+    JobLocalizer localizer2 = new JobLocalizer(conf, "user2", "jobid2",
+        LOCAL_DIR);
+    assertTrue("Localizer 1 job local dirs should have user1",
+        localizer1.ttConf.get(JobLocalizer.JOB_LOCAL_CTXT).contains("user1"));
+    assertTrue("Localizer 2 job local dirs should have user2",
+        localizer2.ttConf.get(JobLocalizer.JOB_LOCAL_CTXT).contains("user2"));
+  }
+}

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Fri Jun 21 06:37:27 2013
@@ -238,6 +238,12 @@ public class TestJobQueueTaskScheduler e
       status.setRunState(TaskStatus.State.RUNNING);
       trackers.get(taskTrackerName).getStatus().getTaskReports().add(status);
     }
+
+    @Override
+    public boolean isInSafeMode() {
+      // TODO Auto-generated method stub
+      return false;
+    }
     
   }
   

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java Fri Jun 21 06:37:27 2013
@@ -39,6 +39,13 @@ public class TestJobStatusPersistency ex
   protected void setUp() throws Exception {
     // Don't start anything by default
   }
+  
+  @Override
+  protected void tearDown() throws Exception {
+    super.tearDown();
+    FileSystem fs = FileSystem.getLocal(new JobConf());
+    fs.delete(TEST_DIR, true);
+  }
 
   private JobID runJob() throws Exception {
     OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
@@ -88,7 +95,7 @@ public class TestJobStatusPersistency ex
     Properties config = new Properties();
     config.setProperty("mapred.job.tracker.persist.jobstatus.active", "true");
     config.setProperty("mapred.job.tracker.persist.jobstatus.hours", "1");
-    startCluster(false, config);
+    startCluster(true, config);
     JobID jobId = runJob();
     JobClient jc = new JobClient(createJobConf());
     RunningJob rj0 = jc.getJob(jobId);
@@ -119,11 +126,8 @@ public class TestJobStatusPersistency ex
   /**
    * Test if the completed job status is persisted to localfs.
    */
-  public void testLocalPersistency() throws Exception {
+  public void testLocalPersistency() throws Exception {    
     FileSystem fs = FileSystem.getLocal(new JobConf());
-    
-    fs.delete(TEST_DIR, true);
-    
     Properties config = new Properties();
     config.setProperty("mapred.job.tracker.persist.jobstatus.active", "true");
     config.setProperty("mapred.job.tracker.persist.jobstatus.hours", "1");
@@ -138,7 +142,6 @@ public class TestJobStatusPersistency ex
     // check if the local fs has the data
     Path jobInfo = new Path(TEST_DIR, rj.getID() + ".info");
     assertTrue("Missing job info from the local fs", fs.exists(jobInfo));
-    fs.delete(TEST_DIR, true);
   }
 
   /**
@@ -153,10 +156,6 @@ public class TestJobStatusPersistency ex
     try {
       FileSystem fs = FileSystem.getLocal(new JobConf());
 
-      if (fs.exists(TEST_DIR) && !fs.delete(TEST_DIR, true)) {
-        fail("Cannot delete TEST_DIR!");
-      }
-
       if (fs.mkdirs(new Path(TEST_DIR, parent))) {
         if (FileUtil.chmod(parent.toUri().getPath(), "-w") != 0) {
           fail("Cannot chmod parent!");

Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerPlugins.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerPlugins.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerPlugins.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerPlugins.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+//import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.util.ServicePlugin;
+import org.junit.Test;
+
+public class TestJobTrackerPlugins extends TestCase {
+  
+  static class FakeServicePlugin implements ServicePlugin {
+
+    private static FakeServicePlugin instance;
+    
+    public static FakeServicePlugin getInstance() {
+      return instance;
+    }
+    
+    private Object service;
+    private boolean stopped;
+    
+    public Object getService() {
+      return service;
+    }
+    
+    public boolean isStopped() {
+      return stopped;
+    }
+    
+    public FakeServicePlugin() {
+      // store static reference to instance so we can retrieve it in the test
+      instance = this;
+    }
+    
+    @Override
+    public void start(Object service) {
+      this.service = service;
+    }
+
+    @Override
+    public void stop() {
+      stopped = true;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+  }
+  
+  @Test
+  public void test() throws Exception {
+    JobConf conf = new JobConf();
+    conf.set("mapred.job.tracker", "localhost:0");
+    conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+    conf.setClass("mapreduce.jobtracker.plugins", FakeServicePlugin.class,
+        ServicePlugin.class);
+    
+    assertNull("Plugin not created", FakeServicePlugin.getInstance());
+    
+    JobTracker jobTracker = JobTracker.startTracker(conf);
+    assertNotNull("Plugin created", FakeServicePlugin.getInstance());
+    assertSame("Service is jobTracker",
+        FakeServicePlugin.getInstance().getService(), jobTracker);
+    assertFalse("Plugin not stopped",
+        FakeServicePlugin.getInstance().isStopped());
+    
+    jobTracker.close();
+    assertTrue("Plugin stopped", FakeServicePlugin.getInstance().isStopped());
+  }
+
+}

Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerQuiescence.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerQuiescence.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerQuiescence.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerQuiescence.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobTracker.SafeModeAction;
+import org.apache.hadoop.mapred.tools.MRAdmin;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/** 
+ * A test for JobTracker safemode. In safemode, no tasks are scheduled, and
+ * no tasks are marked as failed (they are killed instead).
+ */
+public class TestJobTrackerQuiescence {
+  final Path testDir = 
+    new Path(System.getProperty("test.build.data", "/tmp"), "jt-safemode");
+  final Path inDir = new Path(testDir, "input");
+  final Path shareDir = new Path(testDir, "share");
+  final Path outputDir = new Path(testDir, "output");
+  
+  final int maxMapTasks = 1;
+  
+  private MiniDFSCluster dfs;
+  private MiniMRCluster mr;
+  private FileSystem fileSys;
+  private JobTracker jt;
+  
+  private static final Log LOG = 
+    LogFactory.getLog(TestJobTrackerQuiescence.class);
+  
+  @Before
+  public void setUp() throws IOException {
+    
+    Configuration conf = new Configuration();
+    conf.setBoolean("dfs.replication.considerLoad", false);
+    dfs = new MiniDFSCluster(conf, 1, true, null, null);
+    dfs.waitActive();
+    fileSys = dfs.getFileSystem();
+    
+    // clean up
+    fileSys.delete(testDir, true);
+    
+    if (!fileSys.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+
+    // Write the input file
+    UtilsForTests.writeFile(dfs.getNameNode(), conf, 
+                            new Path(inDir + "/file"), (short)1);
+
+    dfs.startDataNodes(conf, 1, true, null, null, null, null);
+    dfs.waitActive();
+    String namenode = (dfs.getFileSystem()).getUri().getHost() + ":" 
+    + (dfs.getFileSystem()).getUri().getPort();
+    
+    JobConf jtConf = new JobConf();
+    jtConf.setInt("mapred.tasktracker.map.tasks.maximum", maxMapTasks);
+    jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
+    jtConf.setBoolean(JobTracker.JT_HDFS_MONITOR_ENABLE, true);
+    jtConf.setInt(JobTracker.JT_HDFS_MONITOR_THREAD_INTERVAL, 1000);
+    mr = new MiniMRCluster(1, namenode, 1, null, null, jtConf);
+    mr.waitUntilIdle();
+    mr.setInlineCleanupThreads();
+    jt = mr.getJobTrackerRunner().getJobTracker();
+  }
+  
+  @After
+  public void tearDown() {
+    if (mr != null) {
+      try {
+        mr.shutdown();
+      } catch (Exception e) {}
+    }
+    if (dfs != null) {
+      try {
+        dfs.shutdown();
+      } catch (Exception e) {}
+    }
+  }
+  
+  @Test
+  public void testHDFSMonitor() throws Exception {
+    /*
+     * Try 'automatic' safe-mode 
+     */
+    // Put HDFS in safe-mode
+    dfs.getNameNode().setSafeMode(
+        org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_ENTER);
+    int numTries = 20;
+    while (!jt.isInSafeMode() && numTries > 0) {
+      Thread.sleep(1000);
+      --numTries;
+    }
+    
+    // By now JT should be in safe-mode
+    assertEquals(true, jt.isInSafeMode());
+      
+    // Remove HDFS from safe-mode
+    dfs.getNameNode().setSafeMode(
+        org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_LEAVE);
+    
+    numTries = 20;
+    while (jt.isInSafeMode() && numTries > 0) {
+      Thread.sleep(1000);
+      --numTries;
+    }
+    
+    // By now JT should not be in safe-mode
+    assertEquals(false, jt.isInSafeMode());
+      
+    /*
+     * Now ensure 'automatic' mode doesn't interfere with 'admin set' safe-mode
+     */
+    dfs.getNameNode().setSafeMode(
+        org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_ENTER);
+    numTries = 20;
+    while (!jt.isInSafeMode() && numTries > 0) {
+      Thread.sleep(1000);
+      --numTries;
+    }
+    
+    // By now JT should be in safe-mode
+    assertEquals(true, jt.isInSafeMode());
+
+    // Now, put JT in admin set safe-mode
+    enterSafeMode();
+    
+    // Bring HDFS back from safe-mode
+    dfs.getNameNode().setSafeMode(
+        org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_LEAVE);
+    
+    numTries = 20;
+    while (jt.isInSafeMode() && numTries > 0) {
+      Thread.sleep(1000);
+      --numTries;
+    }
+    
+    // But now JT should *still* be in safe-mode
+    assertEquals(true, jt.isInSafeMode());
+    assertEquals(true, jt.isInAdminSafeMode());
+    
+    // Leave JT safe-mode
+    leaveSafeMode();
+    assertEquals(false, jt.isInAdminSafeMode());
+    
+    // Bounce HDFS back in-out
+    dfs.getNameNode().setSafeMode(
+        org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_ENTER);
+    Thread.sleep(5000);
+    dfs.getNameNode().setSafeMode(
+        org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_LEAVE);
+    
+    numTries = 20;
+    while (jt.isInSafeMode() && numTries > 0) {
+      Thread.sleep(1000);
+      --numTries;
+    }
+    
+    // By now JT should not be in safe-mode
+    assertEquals(false, jt.isInSafeMode());
+      
+  }
+  
+  @Test
+  public void testMRAdminSafeModeWait() throws Exception {
+    
+    enterSafeMode();
+
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    Future<Void> future = executor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        MRAdmin mrAdmin = new MRAdmin(mr.createJobConf());
+        mrAdmin.run(new String[] { "-safemode", "wait" });
+        return null;
+      }
+    });
+    try {
+      future.get(1, TimeUnit.SECONDS);
+      fail("JT should still be in safemode");
+    } catch (TimeoutException e) {
+      // expected
+    }
+
+    leaveSafeMode();
+
+    try {
+      future.get(10, TimeUnit.SECONDS);
+    } catch (TimeoutException e) {
+      fail("JT should no longer be in safemode");
+    }
+  }
+  
+  @Test
+  public void testJobsPauseInSafeMode() throws Exception {
+    FileSystem fileSys = dfs.getFileSystem();
+    JobConf jobConf = mr.createJobConf();
+    int numMaps = 10;
+    int numReds = 1;
+    String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
+    String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
+    jobConf.set("user.name", UserGroupInformation.getCurrentUser().getUserName());
+    // Configure the job
+    JobConf job = configureJob(jobConf, numMaps, numReds, 
+                               mapSignalFile, redSignalFile);
+      
+    fileSys.delete(shareDir, true);
+    
+    // Submit the job   
+    JobClient jobClient = new JobClient(job);
+    RunningJob rJob = jobClient.submitJob(job);
+    JobID id = rJob.getID();
+    
+    // wait for the job to be inited
+    mr.initializeJob(id);
+    
+    // Make sure that the master job is 50% completed
+    while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 0.5f) {
+      UtilsForTests.waitFor(10);
+    }
+    assertEquals(numMaps / 2, getCompletedMapCount(rJob));
+
+    enterSafeMode();
+
+    // Signal all the maps to complete
+    UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
+    
+    // Signal the reducers to complete
+    UtilsForTests.signalTasks(dfs, fileSys, false, mapSignalFile, 
+                              redSignalFile);
+
+    // only assigned maps complete in safemode since no more maps may be
+    // assigned
+    Thread.sleep(10000);
+    assertEquals(numMaps / 2 + maxMapTasks, getCompletedMapCount(rJob));
+
+    leaveSafeMode();
+    
+    // job completes after leaving safemode
+    UtilsForTests.waitTillDone(jobClient);
+
+    assertTrue(rJob.isSuccessful());
+  }
+  
+  private int getCompletedMapCount(RunningJob rJob) throws IOException {
+    TaskCompletionEvent[] taskCompletionEvents = rJob.getTaskCompletionEvents(0);
+    int mapCount = 0;
+    for (TaskCompletionEvent tce : taskCompletionEvents) {
+      if (tce.isMap) {
+        mapCount++;
+      }
+    }
+    return mapCount;
+  }
+
+  private JobConf configureJob(JobConf conf, int maps, int reduces,
+      String mapSignal, String redSignal) throws IOException {
+    UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir, maps,
+        reduces, "test-jt-safemode", mapSignal, redSignal);
+    return conf;
+  }
+  
+  private void enterSafeMode() throws IOException {
+    jt.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+  }
+
+  private void leaveSafeMode() throws IOException {
+    jt.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+  }
+}

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java Fri Jun 21 06:37:27 2013
@@ -17,23 +17,24 @@
  */
 package org.apache.hadoop.mapred;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapred.TestJobTrackerRestart;
+import org.junit.Test;
+
 
-import junit.framework.TestCase;
 import java.io.*;
-import org.junit.*;
+
 
 /** 
  * This test checks if the jobtracker can detect and recover a tracker that was
  * lost while the jobtracker was down.
  */
-/**UNTIL MAPREDUCE-873 is backported, we will not run recovery manager tests
- */
-@Ignore
-public class TestJobTrackerRestartWithLostTracker extends TestCase {
+
+public class TestJobTrackerRestartWithLostTracker {
   final Path testDir = new Path("/jt-restart-lost-tt-testing");
   final Path inDir = new Path(testDir, "input");
   final Path shareDir = new Path(testDir, "share");
@@ -53,11 +54,14 @@ public class TestJobTrackerRestartWithLo
   throws IOException {
     FileSystem fileSys = dfs.getFileSystem();
     JobConf jobConf = mr.createJobConf();
-    int numMaps = 50;
+    int numMaps = 2;
     int numReds = 1;
     String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
     String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
-    
+
+    // Enable recovery on restart
+    mr.getJobTrackerConf()
+        .setBoolean("mapred.jobtracker.restart.recover", true);
     // Configure the jobs
     JobConf job = configureJob(jobConf, numMaps, numReds, 
                                mapSignalFile, redSignalFile);
@@ -84,10 +88,6 @@ public class TestJobTrackerRestartWithLo
     // Signal the maps to complete
     UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
     
-    // Enable recovery on restart
-    mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
-                                      true);
-    
     // Kill the 2nd tasktracker
     mr.stopTaskTracker(1);
     
@@ -102,6 +102,8 @@ public class TestJobTrackerRestartWithLo
     // Wait for the JT to be ready
     UtilsForTests.waitForJobTracker(jobClient);
 
+    // Signal the maps to complete
+    UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
     // Signal the reducers to complete
     UtilsForTests.signalTasks(dfs, fileSys, false, mapSignalFile, 
                               redSignalFile);
@@ -113,11 +115,9 @@ public class TestJobTrackerRestartWithLo
                  + "upon restart", 
                  jobClient.getClusterStatus().getTaskTrackers(), 1);
 
-    // validate the history file
-    TestJobHistory.validateJobHistoryFileFormat(id, job, "SUCCESS", true);
-    TestJobHistory.validateJobHistoryFileContent(mr, rJob, job);
+    assertTrue("Job should be successful", rJob.isSuccessful());
   }
-  
+  @Test
   public void testRestartWithLostTracker() throws IOException {
     String namenode = null;
     MiniDFSCluster dfs = null;