You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/06/21 08:37:39 UTC
svn commit: r1495297 [37/46] - in /hadoop/common/branches/branch-1-win: ./
bin/ conf/ ivy/ lib/jdiff/ src/c++/libhdfs/docs/
src/c++/libhdfs/tests/conf/ src/contrib/capacity-scheduler/ivy/
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred...
Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.RandomDatum;
+import org.apache.hadoop.io.compress.zlib.ZlibFactory;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import junit.framework.TestCase;
+
+public class TestCompressionStreamReuse extends TestCase {
+ private static final Log LOG = LogFactory
+ .getLog(TestCompressionStreamReuse.class);
+
+ private Configuration conf = new Configuration();
+ private int count = 10000;
+ private int seed = new Random().nextInt();
+
+ public void testBZip2Codec() throws IOException {
+ resetStateTest(conf, seed, count,
+ "org.apache.hadoop.io.compress.BZip2Codec");
+ }
+
+ public void testGzipCompressStreamReuse() throws IOException {
+ resetStateTest(conf, seed, count,
+ "org.apache.hadoop.io.compress.GzipCodec");
+ }
+
+ public void testGzipCompressStreamReuseWithParam() throws IOException {
+ Configuration conf = new Configuration(this.conf);
+ ZlibFactory
+ .setCompressionLevel(conf, CompressionLevel.BEST_COMPRESSION);
+ ZlibFactory.setCompressionStrategy(conf,
+ CompressionStrategy.HUFFMAN_ONLY);
+ resetStateTest(conf, seed, count,
+ "org.apache.hadoop.io.compress.GzipCodec");
+ }
+
+ private static void resetStateTest(Configuration conf, int seed, int count,
+ String codecClass) throws IOException {
+ // Create the codec
+ CompressionCodec codec = null;
+ try {
+ codec = (CompressionCodec) ReflectionUtils.newInstance(conf
+ .getClassByName(codecClass), conf);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException("Illegal codec!");
+ }
+ LOG.info("Created a Codec object of type: " + codecClass);
+
+ // Generate data
+ DataOutputBuffer data = new DataOutputBuffer();
+ RandomDatum.Generator generator = new RandomDatum.Generator(seed);
+ for (int i = 0; i < count; ++i) {
+ generator.next();
+ RandomDatum key = generator.getKey();
+ RandomDatum value = generator.getValue();
+
+ key.write(data);
+ value.write(data);
+ }
+ LOG.info("Generated " + count + " records");
+
+ // Compress data
+ DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
+ DataOutputStream deflateOut = new DataOutputStream(
+ new BufferedOutputStream(compressedDataBuffer));
+ CompressionOutputStream deflateFilter = codec
+ .createOutputStream(deflateOut);
+ deflateFilter.write(data.getData(), 0, data.getLength());
+ deflateFilter.finish();
+ deflateFilter.flush();
+ LOG.info("Finished compressing data");
+
+ // reset deflator
+ deflateFilter.resetState();
+ LOG.info("Finished reseting deflator");
+
+ // re-generate data
+ data.reset();
+ generator = new RandomDatum.Generator(seed);
+ for (int i = 0; i < count; ++i) {
+ generator.next();
+ RandomDatum key = generator.getKey();
+ RandomDatum value = generator.getValue();
+
+ key.write(data);
+ value.write(data);
+ }
+ DataInputBuffer originalData = new DataInputBuffer();
+ DataInputStream originalIn = new DataInputStream(
+ new BufferedInputStream(originalData));
+ originalData.reset(data.getData(), 0, data.getLength());
+
+ // re-compress data
+ compressedDataBuffer.reset();
+ deflateOut = new DataOutputStream(new BufferedOutputStream(
+ compressedDataBuffer));
+ deflateFilter = codec.createOutputStream(deflateOut);
+
+ deflateFilter.write(data.getData(), 0, data.getLength());
+ deflateFilter.finish();
+ deflateFilter.flush();
+ LOG.info("Finished re-compressing data");
+
+ // De-compress data
+ DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
+ deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
+ compressedDataBuffer.getLength());
+ CompressionInputStream inflateFilter = codec
+ .createInputStream(deCompressedDataBuffer);
+ DataInputStream inflateIn = new DataInputStream(
+ new BufferedInputStream(inflateFilter));
+
+ // Check
+ for (int i = 0; i < count; ++i) {
+ RandomDatum k1 = new RandomDatum();
+ RandomDatum v1 = new RandomDatum();
+ k1.readFields(originalIn);
+ v1.readFields(originalIn);
+
+ RandomDatum k2 = new RandomDatum();
+ RandomDatum v2 = new RandomDatum();
+ k2.readFields(inflateIn);
+ v2.readFields(inflateIn);
+ assertTrue(
+ "original and compressed-then-decompressed-output not equal",
+ k1.equals(k2) && v1.equals(v2));
+ }
+ LOG.info("SUCCESS! Completed checking " + count + " records");
+ }
+}
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/io/nativeio/TestNativeIO.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/io/nativeio/TestNativeIO.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/io/nativeio/TestNativeIO.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/io/nativeio/TestNativeIO.java Fri Jun 21 06:37:27 2013
@@ -272,6 +272,65 @@ public class TestNativeIO {
assertPermissions(toChmod, 0644);
}
+ @Test
+ public void testPosixFadvise() throws Exception {
+ FileInputStream fis = new FileInputStream("/dev/zero");
+ try {
+ NativeIO.posix_fadvise(fis.getFD(), 0, 0,
+ NativeIO.POSIX_FADV_SEQUENTIAL);
+ } catch (UnsupportedOperationException uoe) {
+ // we should just skip the unit test on machines where we don't
+ // have fadvise support
+ assumeTrue(false);
+ } finally {
+ fis.close();
+ }
+
+ try {
+ NativeIO.posix_fadvise(fis.getFD(), 0, 1024,
+ NativeIO.POSIX_FADV_SEQUENTIAL);
+
+ fail("Did not throw on bad file");
+ } catch (NativeIOException nioe) {
+ assertEquals(Errno.EBADF, nioe.getErrno());
+ }
+
+ try {
+ NativeIO.posix_fadvise(null, 0, 1024,
+ NativeIO.POSIX_FADV_SEQUENTIAL);
+
+ fail("Did not throw on null file");
+ } catch (NullPointerException npe) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testSyncFileRange() throws Exception {
+ FileOutputStream fos = new FileOutputStream(
+ new File(TEST_DIR, "testSyncFileRange"));
+ try {
+ fos.write("foo".getBytes());
+ NativeIO.sync_file_range(fos.getFD(), 0, 1024,
+ NativeIO.SYNC_FILE_RANGE_WRITE);
+ // no way to verify that this actually has synced,
+ // but if it doesn't throw, we can assume it worked
+ } catch (UnsupportedOperationException uoe) {
+ // we should just skip the unit test on machines where we don't
+ // have fadvise support
+ assumeTrue(false);
+ } finally {
+ fos.close();
+ }
+ try {
+ NativeIO.sync_file_range(fos.getFD(), 0, 1024,
+ NativeIO.SYNC_FILE_RANGE_WRITE);
+ fail("Did not throw on bad file");
+ } catch (NativeIOException nioe) {
+ assertEquals(Errno.EBADF, nioe.getErrno());
+ }
+ }
+
private void assertPermissions(File f, int expected) throws IOException {
FileSystem localfs = FileSystem.getLocal(new Configuration());
FsPermission perms = localfs.getFileStatus(
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestRPC.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestRPC.java Fri Jun 21 06:37:27 2013
@@ -18,31 +18,38 @@
package org.apache.hadoop.ipc;
-import org.apache.hadoop.metrics2.MetricsSource;
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+
import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.lang.reflect.Method;
import java.net.ConnectException;
import java.net.InetSocketAddress;
-import java.lang.reflect.Method;
-
-import junit.framework.TestCase;
-
import java.util.Arrays;
-import org.apache.commons.logging.*;
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC.VersionMismatch;
import org.apache.hadoop.ipc.metrics.RpcInstrumentation;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
-import org.apache.hadoop.security.AccessControlException;
-import static org.apache.hadoop.test.MetricsAsserts.*;
/** Unit tests for RPC. */
public class TestRPC extends TestCase {
@@ -133,6 +140,14 @@ public class TestRPC extends TestCase {
}
}
+ public static class TestVersionMismatchImpl extends TestImpl {
+ /** @return a different version. */
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion) {
+ return super.getProtocolVersion(protocol, clientVersion) + 1;
+ }
+ }
+
//
// an object that does a bunch of transactions
//
@@ -398,6 +413,50 @@ public class TestRPC extends TestCase {
assertCounter("rpcAuthenticationSuccesses", 0, rb);
}
}
+
+ /**
+ * Count the number of threads that have a stack frame containing
+ * the given string
+ */
+ private static int countThreads(String search) {
+ ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+
+ int count = 0;
+ ThreadInfo[] infos = threadBean.getThreadInfo(threadBean.getAllThreadIds(), 20);
+ for (ThreadInfo info : infos) {
+ if (info == null) continue;
+ for (StackTraceElement elem : info.getStackTrace()) {
+ if (elem.getClassName().contains(search)) {
+ count++;
+ break;
+ }
+ }
+ }
+ return count;
+ }
+
+
+ /**
+ * Test that server.stop() properly stops all threads
+ */
+ public void testStopsAllThreads() throws Exception {
+ int threadsBefore = countThreads("Server$Listener$Reader");
+ assertEquals("Expect no Reader threads running before test",
+ 0, threadsBefore);
+
+ final Server server = RPC.getServer(new TestImpl(), ADDRESS,
+ 0, 5, true, conf);
+ server.start();
+ try {
+ int threadsRunning = countThreads("Server$Listener$Reader");
+ assertTrue(threadsRunning > 0);
+ } finally {
+ server.stop();
+ }
+ int threadsAfter = countThreads("Server$Listener$Reader");
+ assertEquals("Expect no Reader threads left running after test",
+ 0, threadsAfter);
+ }
public void testAuthorization() throws Exception {
Configuration conf = new Configuration();
@@ -480,6 +539,35 @@ public class TestRPC extends TestCase {
}
assertTrue(succeeded);
}
+
+ /** Test RPC.checkVersion method. */
+ public void testCheckVersion() throws Exception {
+ Server server = RPC.getServer(new TestVersionMismatchImpl(), ADDRESS, 0, conf);
+ TestProtocol proxy = null;
+ try {
+ server.start();
+
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+ // get proxy should succeed
+ proxy = (TestProtocol)RPC.getProxy(
+ TestProtocol.class, TestProtocol.versionID, addr, ugi, conf,
+ NetUtils.getSocketFactory(conf, TestProtocol.class), 0, null, false);
+
+ try {
+ RPC.checkVersion(TestProtocol.class, TestProtocol.versionID, proxy);
+ fail("Check version should throw VersionMismatch");
+ } catch(VersionMismatch vm) {
+ LOG.info("The VersionMismatch is expected", vm);
+ }
+ } finally {
+ server.stop();
+ if (proxy!=null) {
+ RPC.stopProxy(proxy);
+ }
+ }
+ }
public static void main(String[] args) throws Exception {
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestSaslRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestSaslRPC.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestSaslRPC.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestSaslRPC.java Fri Jun 21 06:37:27 2013
@@ -366,7 +366,10 @@ public class TestSaslRPC {
server.start();
final UserGroupInformation current = UserGroupInformation.getCurrentUser();
- final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ // don't use what the rpc server claims it's bound to since it's the
+ // client's responsibility to set the service
+ final InetSocketAddress addr = NetUtils.createSocketAddr(
+ ADDRESS, server.getListenerAddress().getPort());
TestTokenIdentifier tokenId = new TestTokenIdentifier(new Text(current
.getUserName()));
Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId,
Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestServer.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestServer.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/ipc/TestServer.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+/**
+ * This is intended to be a set of unit tests for the
+ * org.apache.hadoop.ipc.Server class.
+ */
+public class TestServer {
+
+ @Test
+ public void testExceptionsHandler() throws IOException {
+ Server.ExceptionsHandler handler = new Server.ExceptionsHandler();
+ handler.addTerseExceptions(IOException.class);
+ handler.addTerseExceptions(RemoteException.class);
+
+ assertTrue(handler.isTerse(IOException.class));
+ assertTrue(handler.isTerse(RemoteException.class));
+ }
+}
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClientCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClientCluster.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClientCluster.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClientCluster.java Fri Jun 21 06:37:27 2013
@@ -31,6 +31,11 @@ public interface MiniMRClientCluster {
public void start() throws IOException;
+ /**
+ * Stop and start back the cluster using the same configuration.
+ */
+ public void restart() throws IOException;
+
public void stop() throws IOException;
public Configuration getConfig() throws IOException;
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Fri Jun 21 06:37:27 2013
@@ -56,7 +56,7 @@ public class MiniMRCluster {
private String namenode;
private UserGroupInformation ugi = null;
- private JobConf conf;
+ protected JobConf conf;
private int numTrackerToExclude;
private JobConf job;
@@ -100,8 +100,10 @@ public class MiniMRCluster {
public void run() {
try {
jc = (jc == null) ? createJobConf() : createJobConf(jc);
- File f = new File("build/test/mapred/local").getAbsoluteFile();
- jc.set("mapred.local.dir",f.getAbsolutePath());
+ String localPath = System.getProperty("test.build.data",
+ "build/test/mapred/local");
+ File f = new File(localPath).getAbsoluteFile();
+ jc.set("mapred.local.dir", f.getAbsolutePath());
jc.setClass("topology.node.switch.mapping.impl",
StaticMapping.class, DNSToSwitchMapping.class);
final String id =
@@ -333,7 +335,8 @@ public class MiniMRCluster {
private void waitTaskTrackers() {
for(Iterator<TaskTrackerRunner> itr= taskTrackerList.iterator(); itr.hasNext();) {
TaskTrackerRunner runner = itr.next();
- while (!runner.isDead && (!runner.isInitialized || !runner.tt.isIdle())) {
+ while (!runner.isDead && (!runner.isInitialized
+ || !runner.tt.isIdleAndClean())) {
if (!runner.isInitialized) {
LOG.info("Waiting for task tracker to start.");
} else {
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClusterAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClusterAdapter.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClusterAdapter.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClusterAdapter.java Fri Jun 21 06:37:27 2013
@@ -20,7 +20,11 @@ package org.apache.hadoop.mapred;
import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
/**
* An adapter for MiniMRCluster providing a MiniMRClientCluster interface. This
@@ -30,6 +34,8 @@ public class MiniMRClusterAdapter implem
private MiniMRCluster miniMRCluster;
+ private static final Log LOG = LogFactory.getLog(MiniMRClusterAdapter.class);
+
public MiniMRClusterAdapter(MiniMRCluster miniMRCluster) {
this.miniMRCluster = miniMRCluster;
}
@@ -50,4 +56,37 @@ public class MiniMRClusterAdapter implem
miniMRCluster.shutdown();
}
+ @Override
+ public void restart() throws IOException {
+ if (!miniMRCluster.getJobTrackerRunner().isActive()) {
+ LOG.warn("Cannot restart the mini cluster, start it first");
+ return;
+ }
+
+ int jobTrackerPort = miniMRCluster.getJobTrackerPort();
+ int taskTrackerPort = getConfig().getInt(
+ "mapred.task.tracker.report.address", 0);
+ int numtaskTrackers = miniMRCluster.getNumTaskTrackers();
+ String namenode = getConfig().get(FileSystem.FS_DEFAULT_NAME_KEY);
+
+ stop();
+
+ // keep retrying to start the cluster for a max. of 30 sec.
+ for (int i = 0; i < 30; i++) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ try {
+ miniMRCluster = new MiniMRCluster(jobTrackerPort, taskTrackerPort,
+ numtaskTrackers, namenode, 1);
+ break;
+ } catch (Exception e) {
+ LOG.info("Retrying to start the cluster");
+ }
+ }
+
+ }
+
}
Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClusterWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClusterWithNodeGroup.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClusterWithNodeGroup.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/MiniMRClusterWithNodeGroup.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.StaticMapping;
+
+public class MiniMRClusterWithNodeGroup extends MiniMRCluster {
+
+ private String[] nodeGroups;
+
+ public MiniMRClusterWithNodeGroup(int numTaskTrackers, String namenode, int numDir,
+ String[] racks, String[] nodeGroups, String[] hosts, JobConf conf) throws IOException {
+ super(numTaskTrackers, namenode, numDir, racks, hosts, conf);
+ this.nodeGroups = nodeGroups;
+ }
+
+ /**
+ * Start the tasktracker.
+ */
+ @Override
+ public void startTaskTracker(String host, String rack,
+ int idx, int numDir) throws IOException {
+ if (rack != null && nodeGroups != null) {
+ StaticMapping.addNodeToRack(host, rack + nodeGroups);
+ }
+
+ if (host != null) {
+ NetUtils.addStaticResolution(host, "localhost");
+ }
+
+ TaskTrackerRunner taskTracker;
+ taskTracker = new TaskTrackerRunner(idx, numDir, host, conf);
+
+ addTaskTracker(taskTracker);
+ }
+
+}
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/NotificationTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/NotificationTestCase.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/NotificationTestCase.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/NotificationTestCase.java Fri Jun 21 06:37:27 2013
@@ -95,7 +95,7 @@ public abstract class NotificationTestCa
}
public static class NotificationServlet extends HttpServlet {
- public static int counter = 0;
+ public static volatile int counter = 0;
protected void doGet(HttpServletRequest req, HttpServletResponse res)
throws ServletException, IOException {
@@ -159,9 +159,10 @@ public abstract class NotificationTestCa
public void testMR() throws Exception {
System.out.println(launchWordCount(this.createJobConf(),
"a b c d e f g h", 1, 1));
- synchronized(Thread.currentThread()) {
- stdPrintln("Sleeping for 2 seconds to give time for retry");
- Thread.currentThread().sleep(2000);
+ boolean keepTrying = true;
+ for (int tries = 0; tries < 30 && keepTrying; tries++) {
+ Thread.sleep(50);
+ keepTrying = !(NotificationServlet.counter == 2);
}
assertEquals(2, NotificationServlet.counter);
@@ -179,18 +180,20 @@ public abstract class NotificationTestCa
// run a job with KILLED status
System.out.println(UtilsForTests.runJobKill(this.createJobConf(), inDir,
outDir).getID());
- synchronized(Thread.currentThread()) {
- stdPrintln("Sleeping for 2 seconds to give time for retry");
- Thread.currentThread().sleep(2000);
+ keepTrying = true;
+ for (int tries = 0; tries < 30 && keepTrying; tries++) {
+ Thread.sleep(50);
+ keepTrying = !(NotificationServlet.counter == 4);
}
assertEquals(4, NotificationServlet.counter);
// run a job with FAILED status
System.out.println(UtilsForTests.runJobFail(this.createJobConf(), inDir,
outDir).getID());
- synchronized(Thread.currentThread()) {
- stdPrintln("Sleeping for 2 seconds to give time for retry");
- Thread.currentThread().sleep(2000);
+ keepTrying = true;
+ for (int tries = 0; tries < 30 && keepTrying; tries++) {
+ Thread.sleep(50);
+ keepTrying = !(NotificationServlet.counter == 6);
}
assertEquals(6, NotificationServlet.counter);
}
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestAuditLogger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestAuditLogger.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestAuditLogger.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestAuditLogger.java Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
package org.apache.hadoop.mapred;
import java.net.InetAddress;
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestClusterStatus.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestClusterStatus.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestClusterStatus.java Fri Jun 21 06:37:27 2013
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.mapred;
+import static org.junit.Assert.assertEquals;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -24,14 +26,11 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.mapreduce.ClusterMetrics;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-
-import junit.extensions.TestSetup;
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
/**
* Class to test that ClusterMetrics are being created with the right
@@ -39,41 +38,44 @@ import junit.framework.TestSuite;
*
* The tests exercise code paths where the counts of slots are updated.
*/
-public class TestClusterStatus extends TestCase {
+public class TestClusterStatus {
private static String[] trackers = new String[] { "tracker_tracker1:1000",
"tracker_tracker2:1000", "tracker_tracker3:1000" };
- private static JobTracker jobTracker;
- private static int mapSlotsPerTracker = 4;
- private static int reduceSlotsPerTracker = 2;
- private static MiniMRCluster mr;
- private static JobClient client;
+ private JobTracker jobTracker;
+ private final static int mapSlotsPerTracker = 4;
+ private final static int reduceSlotsPerTracker = 2;
+ private MiniMRCluster mr;
+ private JobClient client;
// heartbeat responseId. increment this after sending a heartbeat
- private static short responseId = 1;
+ private short responseId;
private static FakeJobInProgress fakeJob;
private static FakeTaskScheduler scheduler;
- public static Test suite() {
- TestSetup setup = new TestSetup(new TestSuite(TestClusterStatus.class)) {
- protected void setUp() throws Exception {
- JobConf conf = new JobConf();
- conf.setClass("mapred.jobtracker.taskScheduler",
- TestClusterStatus.FakeTaskScheduler.class,
- TaskScheduler.class);
- mr = new MiniMRCluster(0, 0, 0, "file:///", 1, null, null, null, conf);
- jobTracker = mr.getJobTrackerRunner().getJobTracker();
- for (String tracker : trackers) {
- establishFirstContact(jobTracker, tracker);
- }
- client = new JobClient(mr.createJobConf());
- }
-
- protected void tearDown() throws Exception {
- client.close();
- mr.shutdown();
- }
- };
- return setup;
+ @Before
+ public void setUp() throws Exception {
+ responseId = 1;
+ JobConf conf = new JobConf();
+ conf.setClass("mapred.jobtracker.taskScheduler",
+ TestClusterStatus.FakeTaskScheduler.class,
+ TaskScheduler.class);
+ mr = new MiniMRCluster(0, 0, 0, "file:///", 1, null, null, null, conf);
+ jobTracker = mr.getJobTrackerRunner().getJobTracker();
+ for (String tracker : trackers) {
+ establishFirstContact(jobTracker, tracker);
+ }
+ client = new JobClient(mr.createJobConf());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ client.close();
+ mr.shutdown();
+ fakeJob = null;
+ scheduler = null;
+ client = null;
+ mr = null;
+ jobTracker = null;
}
/**
@@ -157,6 +159,7 @@ public class TestClusterStatus extends T
taskStatuses, 0, 0, mapSlotsPerTracker, reduceSlotsPerTracker);
}
+ @Test
public void testClusterMetrics() throws IOException, InterruptedException {
assertEquals("tasktracker count doesn't match", trackers.length,
client.getClusterStatus().getTaskTrackers());
@@ -256,6 +259,7 @@ public class TestClusterStatus extends T
list.add(ts);
}
+ @Test
public void testReservedSlots() throws IOException {
JobConf conf = mr.createJobConf();
@@ -308,6 +312,7 @@ public class TestClusterStatus extends T
0, metrics.getReservedReduceSlots());
}
+ @Test
public void testClusterStatus() throws Exception {
ClusterStatus clusterStatus = client.getClusterStatus();
assertEquals("JobTracker used-memory is " + clusterStatus.getUsedMemory() +
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestIFileStreams.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestIFileStreams.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestIFileStreams.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestIFileStreams.java Fri Jun 21 06:37:27 2013
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.mapred;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -35,7 +36,7 @@ public class TestIFileStreams extends Te
ifos.close();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(dob.getData(), DLEN + 4);
- IFileInputStream ifis = new IFileInputStream(dib, 104);
+ IFileInputStream ifis = new IFileInputStream(dib, 104, new Configuration());
for (int i = 0; i < DLEN; ++i) {
assertEquals(i, ifis.read());
}
@@ -54,7 +55,7 @@ public class TestIFileStreams extends Te
final byte[] b = dob.getData();
++b[17];
dib.reset(b, DLEN + 4);
- IFileInputStream ifis = new IFileInputStream(dib, 104);
+ IFileInputStream ifis = new IFileInputStream(dib, 104, new Configuration());
int i = 0;
try {
while (i < DLEN) {
@@ -83,7 +84,7 @@ public class TestIFileStreams extends Te
ifos.close();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(dob.getData(), DLEN + 4);
- IFileInputStream ifis = new IFileInputStream(dib, 100);
+ IFileInputStream ifis = new IFileInputStream(dib, 100, new Configuration());
int i = 0;
try {
while (i < DLEN - 8) {
Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobClientRetries.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobClientRetries.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobClientRetries.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TestMiniMRWithDFS.TestResult;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestJobClientRetries {
+
+ private static final Log LOG = LogFactory.getLog(TestJobClientRetries.class);
+
+ MiniMRCluster mr;
+
+ @Test
+ public void testJobSubmission() throws Exception {
+
+ // Start MR cluster
+ mr = new MiniMRCluster(2, "file:///", 3);
+
+ final List<Exception> exceptions = new ArrayList<Exception>();
+
+ // Get jobConf
+ final JobConf jobConf = mr.createJobConf();
+
+ // Stop JobTracker
+ LOG.info("Stopping JobTracker");
+ mr.stopJobTracker();
+
+ /*
+ * Submit job *after* setting job-client retries to be *on*...
+ * the test *should* fail without this config being set
+ */
+ LOG.info("Stopping JobTracker");
+ jobConf.setBoolean(
+ JobClient.MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY, true);
+ WordCountThread wc = new WordCountThread(jobConf, exceptions);
+ wc.start();
+
+ // Restart JobTracker after a little while
+ Thread.sleep(5000);
+ LOG.info("Re-starting JobTracker for job-submission to go through");
+ mr.startJobTracker();
+
+ // Wait for the job to complete or for an exception to occur
+ LOG.info("Waiting for job success/failure ...");
+ wc.join();
+
+ Assert.assertNotNull(wc.result);
+ Assert.assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
+ "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", wc.result.output);
+ Assert.assertTrue("exceptions is not empty: " + exceptions, exceptions.isEmpty());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ mr.shutdown();
+ }
+
+ public static class WordCountThread extends Thread {
+ JobConf jobConf;
+ List<Exception> exceptions;
+ TestResult result;
+
+ public WordCountThread(JobConf jobConf, List<Exception> exceptions) {
+ super(WordCountThread.class.getName());
+ this.jobConf = jobConf;
+ this.exceptions = exceptions;
+ }
+
+ @Override
+ public void run() {
+ try {
+ FileSystem fs = FileSystem.getLocal(jobConf);
+ Path testdir = new Path(
+ System.getProperty("test.build.data","/tmp")).makeQualified(fs);
+ final Path inDir = new Path(testdir, "input");
+ final Path outDir = new Path(testdir, "output");
+ String input = "The quick brown fox\nhas many silly\nred fox sox\n";
+ LOG.info("Starting word-count");
+ result =
+ TestMiniMRWithDFS.launchWordCount(
+ jobConf, inDir, outDir, input, 3, 1);
+ LOG.info("Finished word-count");
+ } catch (Exception e) {
+ LOG.error("Caught exception during word-count", e);
+ exceptions.add(e);
+ result = null;
+ }
+ }
+ }
+}
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistory.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistory.java Fri Jun 21 06:37:27 2013
@@ -22,6 +22,7 @@ import java.io.File;
import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
+import java.util.Calendar;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
@@ -34,6 +35,7 @@ import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -46,6 +48,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.util.Shell;
/**
* Tests the JobHistory files - to catch any changes to JobHistory that can
@@ -381,6 +384,23 @@ public class TestJobHistory extends Test
(status.equals("SUCCESS") || status.equals("FAILED") ||
status.equals("KILLED")));
+ // Validate task Avataar
+ String avataar = attempt.get(Keys.AVATAAR);
+ assertTrue("Unexpected LOCALITY \"" + avataar + "\" is seen in " +
+ " history file for task attempt " + id,
+ (avataar.equals("VIRGIN") || avataar.equals("SPECULATIVE"))
+ );
+
+ // Map Task Attempts should have valid LOCALITY
+ if (type.equals("MAP")) {
+ String locality = attempt.get(Keys.LOCALITY);
+ assertTrue("Unexpected LOCALITY \"" + locality + "\" is seen in " +
+ " history file for task attempt " + id,
+ (locality.equals("NODE_LOCAL") || locality.equals("GROUP_LOCAL") ||
+ locality.equals("RACK_LOCAL") || locality.equals("OFF_SWITCH"))
+ );
+ }
+
// Reduce Task Attempts should have valid SHUFFLE_FINISHED time and
// SORT_FINISHED time
if (type.equals("REDUCE") && status.equals("SUCCESS")) {
@@ -823,10 +843,33 @@ public class TestJobHistory extends Test
// Validate the job queue name
assertTrue(jobInfo.getJobQueue().equals(conf.getQueueName()));
+
+ // Validate the workflow properties
+ assertTrue(jobInfo.get(Keys.WORKFLOW_ID).equals(
+ conf.get(JobConf.WORKFLOW_ID, "")));
+ assertTrue(jobInfo.get(Keys.WORKFLOW_NAME).equals(
+ conf.get(JobConf.WORKFLOW_NAME, "")));
+ assertTrue(jobInfo.get(Keys.WORKFLOW_NODE_NAME).equals(
+ conf.get(JobConf.WORKFLOW_NODE_NAME, "")));
+ assertTrue(jobInfo.get(Keys.WORKFLOW_ADJACENCIES).equals(
+ JobHistory.JobInfo.getWorkflowAdjacencies(conf)));
+ assertTrue(jobInfo.get(Keys.WORKFLOW_TAGS).equals(
+ conf.get(JobConf.WORKFLOW_TAGS, "")));
+ }
+
+ public void testDoneFolderOnHDFS() throws IOException, InterruptedException {
+ runDoneFolderTest("history_done");
+ }
+
+ public void testDoneFolderNotOnDefaultFileSystem() throws IOException,
+ InterruptedException {
+ runDoneFolderTest("file:///" + System.getProperty("test.build.data", "tmp")
+ + "/history_done");
}
- public void testDoneFolderOnHDFS() throws IOException {
+ private void runDoneFolderTest(String doneFolder) throws IOException, InterruptedException {
MiniMRCluster mr = null;
+ MiniDFSCluster dfsCluster = null;
try {
JobConf conf = new JobConf();
// keep for less time
@@ -834,10 +877,9 @@ public class TestJobHistory extends Test
conf.setLong("mapred.jobtracker.retirejob.interval", 100000);
//set the done folder location
- String doneFolder = "history_done";
conf.set("mapred.job.tracker.history.completed.location", doneFolder);
- MiniDFSCluster dfsCluster = new MiniDFSCluster(conf, 2, true, null);
+ dfsCluster = new MiniDFSCluster(conf, 2, true, null);
mr = new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(),
3, null, null, conf);
@@ -863,7 +905,7 @@ public class TestJobHistory extends Test
Path doneDir = JobHistory.getCompletedJobHistoryLocation();
assertEquals("History DONE folder not correct",
- doneFolder, doneDir.getName());
+ new Path(doneFolder).getName(), doneDir.getName());
JobID id = job.getID();
String logFileName = getDoneFile(conf, id, doneDir);
assertNotNull(logFileName);
@@ -894,28 +936,33 @@ public class TestJobHistory extends Test
// Test that all of the ancestors of the log file have the same
// permissions as the done directory
- Path cursor = logFile.getParent();
-
- Path doneParent = doneDir.getParent();
-
- FsPermission donePermission = getStatus(fileSys, doneDir).getPermission();
-
- System.err.println("testDoneFolderOnHDFS: done dir permission = "
- + donePermission);
+ // The folders between the done folder and the folder containing the
+ // log file are created automatically. Since the default permission
+ // on Windows may not be the same as JobHistory.HISTORY_DIR_PERMISSION
+ // so we skip this check if the file system is local file system
+ // and is windows
+ if (!(fileSys instanceof LocalFileSystem && Shell.WINDOWS)) {
+ Path cursor = logFile.getParent();
+
+ Path doneParent = doneDir.getParent();
+
+ FsPermission donePermission = getStatus(fileSys, doneDir)
+ .getPermission();
+
+ System.err.println("testDoneFolderOnHDFS: done dir permission = "
+ + donePermission);
+
+ while (!cursor.equals(doneParent)) {
+ FileStatus cursorStatus = getStatus(fileSys, cursor);
+ FsPermission cursorPermission = cursorStatus.getPermission();
+
+ assertEquals("testDoneFolder: A done directory descendant, " + cursor
+ + " does not have the same permisison as the done directory, "
+ + doneDir, donePermission, cursorPermission);
- while (!cursor.equals(doneParent)) {
- FileStatus cursorStatus = getStatus(fileSys, cursor);
- FsPermission cursorPermission = cursorStatus.getPermission();
-
- assertEquals("testDoneFolderOnHDFS: A done directory descendant, "
- + cursor
- + " does not have the same permisison as the done directory, "
- + doneDir,
- donePermission,
- cursorPermission);
-
- cursor = cursor.getParent();
- }
+ cursor = cursor.getParent();
+ }
+ }
// check if the job file is removed from the history location
Path runningJobsHistoryFolder = logFile.getParent().getParent();
@@ -937,6 +984,10 @@ public class TestJobHistory extends Test
cleanupLocalFiles(mr);
mr.shutdown();
}
+
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ }
}
}
@@ -980,7 +1031,21 @@ public class TestJobHistory extends Test
// no queue admins for default queue
conf.set(QueueManager.toFullPropertyName(
"default", QueueACL.ADMINISTER_JOBS.getAclName()), " ");
-
+
+ // set workflow properties
+ conf.set(JobConf.WORKFLOW_ID, "workflowId1");
+ conf.set(JobConf.WORKFLOW_NAME, "workflowName1");
+ String workflowNodeName = "A";
+ conf.set(JobConf.WORKFLOW_NODE_NAME, workflowNodeName);
+ conf.set(JobConf.WORKFLOW_ADJACENCY_PREFIX_STRING + workflowNodeName,
+ "BC");
+ conf.set(JobConf.WORKFLOW_ADJACENCY_PREFIX_STRING + workflowNodeName,
+ "DEF");
+ conf.set(JobConf.WORKFLOW_ADJACENCY_PREFIX_STRING + "DEF", "G");
+ conf.set(JobConf.WORKFLOW_ADJACENCY_PREFIX_STRING + "Z",
+ workflowNodeName);
+ conf.set(JobConf.WORKFLOW_TAGS, "tag1,tag2");
+
mr = new MiniMRCluster(2, "file:///", 3, null, null, conf);
// run the TCs
@@ -1292,4 +1357,47 @@ public class TestJobHistory extends Test
}
}
}
+
+ public void testJobHistoryCleaner() throws Exception {
+ JobConf conf = new JobConf();
+ FileSystem fs = FileSystem.get(conf);
+ JobHistory.DONEDIR_FS = fs;
+ JobHistory.DONE = new Path(TEST_ROOT_DIR + "/done");
+ Path histDirOld = new Path(JobHistory.DONE, "version-1/jtinstid/2013/02/05/000000/");
+ Path histDirOnLine = new Path(JobHistory.DONE, "version-1/jtinstid/2013/02/06/000000/");
+ final int dayMillis = 1000 * 60 * 60 * 24;
+
+ try {
+ Calendar runTime = Calendar.getInstance();
+ runTime.clear();
+ runTime.set(2013, 1, 8, 12, 0);
+ long runTimeMillis = runTime.getTimeInMillis();
+
+ fs.mkdirs(histDirOld);
+ fs.mkdirs(histDirOnLine);
+ Path histFileOldDir = new Path(histDirOld, "jobfile1.txt");
+ Path histFileOnLineDir = new Path(histDirOnLine, "jobfile1.txt");
+ Path histFileDontDelete = new Path(histDirOnLine, "jobfile2.txt");
+ fs.create(histFileOldDir).close();
+ fs.create(histFileOnLineDir).close();
+ fs.create(histFileDontDelete).close();
+ new File(histFileOnLineDir.toUri()).setLastModified(
+ runTimeMillis - dayMillis * 5 / 2);
+ new File(histFileDontDelete.toUri()).setLastModified(
+ runTimeMillis - dayMillis * 3 / 2);
+
+ HistoryCleaner.maxAgeOfHistoryFiles = dayMillis * 2; // two days
+ HistoryCleaner historyCleaner = new HistoryCleaner();
+
+ historyCleaner.clean(runTimeMillis);
+
+ assertFalse(fs.exists(histDirOld));
+ assertTrue(fs.exists(histDirOnLine));
+ assertFalse(fs.exists(histFileOldDir));
+ assertFalse(fs.exists(histFileOnLineDir));
+ assertTrue(fs.exists(histFileDontDelete));
+ } finally {
+ fs.delete(JobHistory.DONE, true);
+ }
+ }
}
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistoryConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistoryConfig.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistoryConfig.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistoryConfig.java Fri Jun 21 06:37:27 2013
@@ -17,8 +17,10 @@
*/
package org.apache.hadoop.mapred;
+import java.util.Date;
import java.io.IOException;
import java.io.PrintWriter;
+import java.text.SimpleDateFormat;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.LongWritable;
@@ -115,7 +117,8 @@ public class TestJobHistoryConfig extend
conf.setQueueName("default");
String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
"/tmp")).toString().replace(' ', '+');
- JobTracker jt = JobTracker.startTracker(conf);
+ String uniqid = new SimpleDateFormat("yyyyMMddHHmm").format(new Date());
+ JobTracker jt = JobTracker.startTracker(conf, uniqid, true);
assertTrue(jt != null);
JobInProgress jip = new JobInProgress(new JobID("jt", 1),
new JobConf(conf), jt);
@@ -140,8 +143,9 @@ public class TestJobHistoryConfig extend
private boolean canStartJobTracker(JobConf conf) throws InterruptedException,
IOException {
JobTracker jt = null;
+ String uniqid = new SimpleDateFormat("yyyyMMddHHmm").format(new Date());
try {
- jt = JobTracker.startTracker(conf);
+ jt = JobTracker.startTracker(conf, uniqid, true);
Log.info("Started JobTracker");
} catch (IOException e) {
Log.info("Can not Start JobTracker", e.getLocalizedMessage());
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistoryServer.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistoryServer.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobHistoryServer.java Fri Jun 21 06:37:27 2013
@@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFac
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.fs.Path;
@@ -34,14 +33,13 @@ import org.junit.Assert;
import junit.framework.TestCase;
import java.io.IOException;
-import java.io.InputStream;
import java.net.HttpURLConnection;
-import java.net.URL;
-import java.net.MalformedURLException;
public class TestJobHistoryServer extends TestCase {
private static final Log LOG = LogFactory.getLog(TestJobHistoryServer.class);
-
+ private String inputPath = System.getProperty("test.build.data",
+ "build/test/data") + "/TestJobHistoryServer";
+
public void testHistoryServerEmbedded() {
MiniMRCluster mrCluster = null;
@@ -61,7 +59,7 @@ public class TestJobHistoryServer extend
LOG.info("******** History Address: " + historyAddress);
conf = mrCluster.createJobConf();
- createInputFile(conf, "/tmp/input");
+ createInputFile(conf, inputPath);
RunningJob job = runJob(conf);
LOG.info("Job details: " + job);
@@ -72,6 +70,9 @@ public class TestJobHistoryServer extend
} catch (IOException e) {
LOG.error("Failure running test", e);
Assert.fail(e.getMessage());
+ } catch (InterruptedException e) {
+ LOG.error("Exit due to being interrupted");
+ Assert.fail(e.getMessage());
} finally {
if (mrCluster != null) mrCluster.shutdown();
}
@@ -100,7 +101,7 @@ public class TestJobHistoryServer extend
LOG.info("******** History Address: " + historyAddress);
conf = mrCluster.createJobConf();
- createInputFile(conf, "/tmp/input");
+ createInputFile(conf, inputPath);
RunningJob job = runJob(conf);
LOG.info("Job details: " + job);
@@ -111,6 +112,9 @@ public class TestJobHistoryServer extend
} catch (IOException e) {
LOG.error("Failure running test", e);
Assert.fail(e.getMessage());
+ } catch (InterruptedException e) {
+ LOG.error("Exit due to being interrupted");
+ Assert.fail(e.getMessage());
} finally {
if (mrCluster != null) mrCluster.shutdown();
try {
@@ -145,17 +149,33 @@ public class TestJobHistoryServer extend
conf.setMapperClass(org.apache.hadoop.mapred.lib.IdentityMapper.class);
conf.setReducerClass(org.apache.hadoop.mapred.lib.IdentityReducer.class);
- FileInputFormat.setInputPaths(conf, "/tmp/input");
+ FileInputFormat.setInputPaths(conf, inputPath);
return JobClient.runJob(conf);
}
- private String getRedirectUrl(String jobUrl) throws IOException {
+ private String getRedirectUrl(String jobUrl) throws IOException, InterruptedException {
HttpClient client = new HttpClient();
GetMethod method = new GetMethod(jobUrl);
method.setFollowRedirects(false);
try {
int status = client.executeMethod(method);
+ if(status!=HttpURLConnection.HTTP_MOVED_TEMP) {
+ int retryTimes = 4;
+ for(int i = 1; i < retryTimes + 1; i++) {
+ try {
+ // Wait i sec
+ Thread.sleep(i * 1000);
+ } catch (InterruptedException e) {
+ throw new InterruptedException("Exit due to being interrupted");
+ }
+ // Get the latest status
+ status = client.executeMethod(method);
+ if(status == HttpURLConnection.HTTP_MOVED_TEMP)
+ break;
+ }
+ }
+
Assert.assertEquals(status, HttpURLConnection.HTTP_MOVED_TEMP);
LOG.info("Location: " + method.getResponseHeader("Location"));
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobInProgress.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobInProgress.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobInProgress.java Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
package org.apache.hadoop.mapred;
import java.io.DataOutputStream;
@@ -10,7 +25,6 @@ import java.util.HashSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.RandomWriter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -20,17 +34,35 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.UtilsForTests;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.net.StaticMapping;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.*;
+import org.junit.Test;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
-import junit.framework.TestCase;
-
-public class TestJobInProgress extends TestCase {
+public class TestJobInProgress {
static final Log LOG = LogFactory.getLog(TestJobInProgress.class);
- private MiniMRCluster mrCluster;
+ private static MiniMRCluster mrCluster;
+
+ private static MiniDFSCluster dfsCluster;
+ private static JobTracker jt;
+
+ static final String trackers[] = new String[] {
+ "tracker_tracker1.r1.com:1000", "tracker_tracker2.r1.com:1000",
+ "tracker_tracker3.r2.com:1000", "tracker_tracker4.r3.com:1000" };
+
+ static final String[] hosts = new String[] { "tracker1.r1.com",
+ "tracker2.r1.com", "tracker3.r2.com", "tracker4.r3.com" };
+
+ static final String[] racks = new String[] { "/r1", "/r1", "/r2", "/r3" };
- private MiniDFSCluster dfsCluster;
- JobTracker jt;
private static Path TEST_DIR =
new Path("/tmp/TestJobInProgress", "jip-testing");
private static int numSlaves = 4;
@@ -72,22 +104,43 @@ public class TestJobInProgress extends T
}
- @Override
- protected void setUp() throws Exception {
- // TODO Auto-generated method stub
- super.setUp();
+ @BeforeClass
+ public static void setUp() throws Exception {
Configuration conf = new Configuration();
+ conf.set("mapreduce.jobtracker.address", "localhost:0");
+ conf.set("mapreduce.jobtracker.http.address", "0.0.0.0:0");
+ conf.setClass("topology.node.switch.mapping.impl", StaticMapping.class,
+ DNSToSwitchMapping.class);
dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
mrCluster = new MiniMRCluster(numSlaves, dfsCluster.getFileSystem()
.getUri().toString(), 1);
jt = mrCluster.getJobTrackerRunner().getJobTracker();
+ // Set up the Topology Information
+ for (int i = 0; i < hosts.length; i++) {
+ StaticMapping.addNodeToRack(hosts[i], racks[i]);
+ }
+ for (String s : trackers) {
+ FakeObjectUtilities.establishFirstContact(jt, s);
+ }
}
+ @Test
public void testPendingMapTaskCount() throws Exception {
launchTask(FailMapTaskJob.class, IdentityReducer.class);
checkTaskCounts();
}
+
+ /**
+ * Test to ensure that the job works when slow start is used and
+ * some tasks are allowed to fail
+ */
+ @Test
+ public void testSlowStartAndFailurePercent() throws Exception {
+ launchTaskSlowStart(FailMapTaskJob.class, IdentityReducer.class);
+ checkTaskCounts();
+ }
+ @Test
public void testPendingReduceTaskCount() throws Exception {
launchTask(IdentityMapper.class, FailReduceTaskJob.class);
checkTaskCounts();
@@ -113,7 +166,7 @@ public class TestJobInProgress extends T
job.set(UtilsForTests.getTaskSignalParameter(true), mapSignalFile.toString());
job.set(UtilsForTests.getTaskSignalParameter(false), redSignalFile.toString());
- // Disable slow-start for reduces since this maps don't complete
+ // Disable slow-start for reduces since this maps don't complete
// in these test-cases...
job.setFloat("mapred.reduce.slowstart.completed.maps", 0.0f);
@@ -174,6 +227,7 @@ public class TestJobInProgress extends T
}
}
+ @Test
public void testRunningTaskCount() throws Exception {
// test with spec = false and locality=true
testRunningTaskCount(false, true);
@@ -188,13 +242,46 @@ public class TestJobInProgress extends T
testRunningTaskCount(true, false);
}
- @Override
- protected void tearDown() throws Exception {
+ @Test
+ public void testLocality() throws Exception {
+ NetworkTopology nt = new NetworkTopology();
+
+ Node r1n1 = new NodeBase("/default/rack1/node1");
+ nt.add(r1n1);
+ Node r1n2 = new NodeBase("/default/rack1/node2");
+ nt.add(r1n2);
+
+ Node r2n3 = new NodeBase("/default/rack2/node3");
+ nt.add(r2n3);
+
+ LOG.debug("r1n1 parent: " + r1n1.getParent() + "\n" +
+ "r1n2 parent: " + r1n2.getParent() + "\n" +
+ "r2n3 parent: " + r2n3.getParent());
+
+ // Same host
+ assertEquals(0, JobInProgress.getMatchingLevelForNodes(r1n1, r1n1, 3));
+ // Same rack
+ assertEquals(1, JobInProgress.getMatchingLevelForNodes(r1n1, r1n2, 3));
+ // Different rack
+ assertEquals(2, JobInProgress.getMatchingLevelForNodes(r1n1, r2n3, 3));
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
mrCluster.shutdown();
dfsCluster.shutdown();
- super.tearDown();
}
+ void launchTaskSlowStart(Class MapClass,Class ReduceClass) throws Exception{
+ JobConf job = configure(MapClass, ReduceClass, 5, 10, true);
+ // set it so no reducers start until all maps finished
+ job.setFloat("mapred.reduce.slowstart.completed.maps", 1.0f);
+ // allow all maps to fail
+ job.setInt("mapred.max.map.failures.percent", 100);
+ try {
+ JobClient.runJob(job);
+ } catch (IOException ioe) {}
+ }
void launchTask(Class MapClass,Class ReduceClass) throws Exception{
JobConf job = configure(MapClass, ReduceClass, 5, 10, true);
@@ -255,5 +342,15 @@ public class TestJobInProgress extends T
}
}
}
-
+
+ @Test
+ public void testScheduleReducesConsiderFailedMapTips() throws Exception {
+ JobInProgress jip = Mockito.mock(JobInProgress.class);
+ Mockito.when(jip.scheduleReduces()).thenCallRealMethod();
+ jip.failedMapTIPs = 10;
+ jip.finishedMapTasks = 50;
+ jip.completedMapsForReduceSlowstart = 60;
+ assertTrue("The Reduce is not scheduled", jip.scheduleReduces());
+ }
+
}
Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobLocalizer.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobLocalizer.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobLocalizer.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+public class TestJobLocalizer {
+
+ @Test(timeout = 1000)
+ public void testConcurrentJobLocalizers() throws IOException {
+ final String LOCAL_DIR = "/tmp/mapred/local";
+ JobConf conf = new JobConf(new Configuration());
+
+ JobLocalizer localizer1 = new JobLocalizer(conf, "user1", "jobid1",
+ LOCAL_DIR);
+ JobLocalizer localizer2 = new JobLocalizer(conf, "user2", "jobid2",
+ LOCAL_DIR);
+ assertTrue("Localizer 1 job local dirs should have user1",
+ localizer1.ttConf.get(JobLocalizer.JOB_LOCAL_CTXT).contains("user1"));
+ assertTrue("Localizer 2 job local dirs should have user2",
+ localizer2.ttConf.get(JobLocalizer.JOB_LOCAL_CTXT).contains("user2"));
+ }
+}
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Fri Jun 21 06:37:27 2013
@@ -238,6 +238,12 @@ public class TestJobQueueTaskScheduler e
status.setRunState(TaskStatus.State.RUNNING);
trackers.get(taskTrackerName).getStatus().getTaskReports().add(status);
}
+
+ @Override
+ public boolean isInSafeMode() {
+ // TODO Auto-generated method stub
+ return false;
+ }
}
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java Fri Jun 21 06:37:27 2013
@@ -39,6 +39,13 @@ public class TestJobStatusPersistency ex
protected void setUp() throws Exception {
// Don't start anything by default
}
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ FileSystem fs = FileSystem.getLocal(new JobConf());
+ fs.delete(TEST_DIR, true);
+ }
private JobID runJob() throws Exception {
OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
@@ -88,7 +95,7 @@ public class TestJobStatusPersistency ex
Properties config = new Properties();
config.setProperty("mapred.job.tracker.persist.jobstatus.active", "true");
config.setProperty("mapred.job.tracker.persist.jobstatus.hours", "1");
- startCluster(false, config);
+ startCluster(true, config);
JobID jobId = runJob();
JobClient jc = new JobClient(createJobConf());
RunningJob rj0 = jc.getJob(jobId);
@@ -119,11 +126,8 @@ public class TestJobStatusPersistency ex
/**
* Test if the completed job status is persisted to localfs.
*/
- public void testLocalPersistency() throws Exception {
+ public void testLocalPersistency() throws Exception {
FileSystem fs = FileSystem.getLocal(new JobConf());
-
- fs.delete(TEST_DIR, true);
-
Properties config = new Properties();
config.setProperty("mapred.job.tracker.persist.jobstatus.active", "true");
config.setProperty("mapred.job.tracker.persist.jobstatus.hours", "1");
@@ -138,7 +142,6 @@ public class TestJobStatusPersistency ex
// check if the local fs has the data
Path jobInfo = new Path(TEST_DIR, rj.getID() + ".info");
assertTrue("Missing job info from the local fs", fs.exists(jobInfo));
- fs.delete(TEST_DIR, true);
}
/**
@@ -153,10 +156,6 @@ public class TestJobStatusPersistency ex
try {
FileSystem fs = FileSystem.getLocal(new JobConf());
- if (fs.exists(TEST_DIR) && !fs.delete(TEST_DIR, true)) {
- fail("Cannot delete TEST_DIR!");
- }
-
if (fs.mkdirs(new Path(TEST_DIR, parent))) {
if (FileUtil.chmod(parent.toUri().getPath(), "-w") != 0) {
fail("Cannot chmod parent!");
Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerPlugins.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerPlugins.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerPlugins.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerPlugins.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+//import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.util.ServicePlugin;
+import org.junit.Test;
+
+public class TestJobTrackerPlugins extends TestCase {
+
+ static class FakeServicePlugin implements ServicePlugin {
+
+ private static FakeServicePlugin instance;
+
+ public static FakeServicePlugin getInstance() {
+ return instance;
+ }
+
+ private Object service;
+ private boolean stopped;
+
+ public Object getService() {
+ return service;
+ }
+
+ public boolean isStopped() {
+ return stopped;
+ }
+
+ public FakeServicePlugin() {
+ // store static reference to instance so we can retrieve it in the test
+ instance = this;
+ }
+
+ @Override
+ public void start(Object service) {
+ this.service = service;
+ }
+
+ @Override
+ public void stop() {
+ stopped = true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+ JobConf conf = new JobConf();
+ conf.set("mapred.job.tracker", "localhost:0");
+ conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+ conf.setClass("mapreduce.jobtracker.plugins", FakeServicePlugin.class,
+ ServicePlugin.class);
+
+ assertNull("Plugin not created", FakeServicePlugin.getInstance());
+
+ JobTracker jobTracker = JobTracker.startTracker(conf);
+ assertNotNull("Plugin created", FakeServicePlugin.getInstance());
+ assertSame("Service is jobTracker",
+ FakeServicePlugin.getInstance().getService(), jobTracker);
+ assertFalse("Plugin not stopped",
+ FakeServicePlugin.getInstance().isStopped());
+
+ jobTracker.close();
+ assertTrue("Plugin stopped", FakeServicePlugin.getInstance().isStopped());
+ }
+
+}
Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerQuiescence.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerQuiescence.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerQuiescence.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerQuiescence.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobTracker.SafeModeAction;
+import org.apache.hadoop.mapred.tools.MRAdmin;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * A test for JobTracker safemode. In safemode, no tasks are scheduled, and
+ * no tasks are marked as failed (they are killed instead).
+ */
+public class TestJobTrackerQuiescence {
+ final Path testDir =
+ new Path(System.getProperty("test.build.data", "/tmp"), "jt-safemode");
+ final Path inDir = new Path(testDir, "input");
+ final Path shareDir = new Path(testDir, "share");
+ final Path outputDir = new Path(testDir, "output");
+
+ final int maxMapTasks = 1;
+
+ private MiniDFSCluster dfs;
+ private MiniMRCluster mr;
+ private FileSystem fileSys;
+ private JobTracker jt;
+
+ private static final Log LOG =
+ LogFactory.getLog(TestJobTrackerQuiescence.class);
+
+ @Before
+ public void setUp() throws IOException {
+
+ Configuration conf = new Configuration();
+ conf.setBoolean("dfs.replication.considerLoad", false);
+ dfs = new MiniDFSCluster(conf, 1, true, null, null);
+ dfs.waitActive();
+ fileSys = dfs.getFileSystem();
+
+ // clean up
+ fileSys.delete(testDir, true);
+
+ if (!fileSys.mkdirs(inDir)) {
+ throw new IOException("Mkdirs failed to create " + inDir.toString());
+ }
+
+ // Write the input file
+ UtilsForTests.writeFile(dfs.getNameNode(), conf,
+ new Path(inDir + "/file"), (short)1);
+
+ dfs.startDataNodes(conf, 1, true, null, null, null, null);
+ dfs.waitActive();
+ String namenode = (dfs.getFileSystem()).getUri().getHost() + ":"
+ + (dfs.getFileSystem()).getUri().getPort();
+
+ JobConf jtConf = new JobConf();
+ jtConf.setInt("mapred.tasktracker.map.tasks.maximum", maxMapTasks);
+ jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
+ jtConf.setBoolean(JobTracker.JT_HDFS_MONITOR_ENABLE, true);
+ jtConf.setInt(JobTracker.JT_HDFS_MONITOR_THREAD_INTERVAL, 1000);
+ mr = new MiniMRCluster(1, namenode, 1, null, null, jtConf);
+ mr.waitUntilIdle();
+ mr.setInlineCleanupThreads();
+ jt = mr.getJobTrackerRunner().getJobTracker();
+ }
+
+ @After
+ public void tearDown() {
+ if (mr != null) {
+ try {
+ mr.shutdown();
+ } catch (Exception e) {}
+ }
+ if (dfs != null) {
+ try {
+ dfs.shutdown();
+ } catch (Exception e) {}
+ }
+ }
+
+ @Test
+ public void testHDFSMonitor() throws Exception {
+ /*
+ * Try 'automatic' safe-mode
+ */
+ // Put HDFS in safe-mode
+ dfs.getNameNode().setSafeMode(
+ org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_ENTER);
+ int numTries = 20;
+ while (!jt.isInSafeMode() && numTries > 0) {
+ Thread.sleep(1000);
+ --numTries;
+ }
+
+ // By now JT should be in safe-mode
+ assertEquals(true, jt.isInSafeMode());
+
+ // Remove HDFS from safe-mode
+ dfs.getNameNode().setSafeMode(
+ org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_LEAVE);
+
+ numTries = 20;
+ while (jt.isInSafeMode() && numTries > 0) {
+ Thread.sleep(1000);
+ --numTries;
+ }
+
+ // By now JT should not be in safe-mode
+ assertEquals(false, jt.isInSafeMode());
+
+ /*
+ * Now ensure 'automatic' mode doesn't interfere with 'admin set' safe-mode
+ */
+ dfs.getNameNode().setSafeMode(
+ org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_ENTER);
+ numTries = 20;
+ while (!jt.isInSafeMode() && numTries > 0) {
+ Thread.sleep(1000);
+ --numTries;
+ }
+
+ // By now JT should be in safe-mode
+ assertEquals(true, jt.isInSafeMode());
+
+ // Now, put JT in admin set safe-mode
+ enterSafeMode();
+
+ // Bring HDFS back from safe-mode
+ dfs.getNameNode().setSafeMode(
+ org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_LEAVE);
+
+ numTries = 20;
+ while (jt.isInSafeMode() && numTries > 0) {
+ Thread.sleep(1000);
+ --numTries;
+ }
+
+ // But now JT should *still* be in safe-mode
+ assertEquals(true, jt.isInSafeMode());
+ assertEquals(true, jt.isInAdminSafeMode());
+
+ // Leave JT safe-mode
+ leaveSafeMode();
+ assertEquals(false, jt.isInAdminSafeMode());
+
+ // Bounce HDFS back in-out
+ dfs.getNameNode().setSafeMode(
+ org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_ENTER);
+ Thread.sleep(5000);
+ dfs.getNameNode().setSafeMode(
+ org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction.SAFEMODE_LEAVE);
+
+ numTries = 20;
+ while (jt.isInSafeMode() && numTries > 0) {
+ Thread.sleep(1000);
+ --numTries;
+ }
+
+ // By now JT should not be in safe-mode
+ assertEquals(false, jt.isInSafeMode());
+
+ }
+
+ @Test
+ public void testMRAdminSafeModeWait() throws Exception {
+
+ enterSafeMode();
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ Future<Void> future = executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ MRAdmin mrAdmin = new MRAdmin(mr.createJobConf());
+ mrAdmin.run(new String[] { "-safemode", "wait" });
+ return null;
+ }
+ });
+ try {
+ future.get(1, TimeUnit.SECONDS);
+ fail("JT should still be in safemode");
+ } catch (TimeoutException e) {
+ // expected
+ }
+
+ leaveSafeMode();
+
+ try {
+ future.get(10, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ fail("JT should no longer be in safemode");
+ }
+ }
+
+ @Test
+ public void testJobsPauseInSafeMode() throws Exception {
+ FileSystem fileSys = dfs.getFileSystem();
+ JobConf jobConf = mr.createJobConf();
+ int numMaps = 10;
+ int numReds = 1;
+ String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
+ String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
+ jobConf.set("user.name", UserGroupInformation.getCurrentUser().getUserName());
+ // Configure the job
+ JobConf job = configureJob(jobConf, numMaps, numReds,
+ mapSignalFile, redSignalFile);
+
+ fileSys.delete(shareDir, true);
+
+ // Submit the job
+ JobClient jobClient = new JobClient(job);
+ RunningJob rJob = jobClient.submitJob(job);
+ JobID id = rJob.getID();
+
+ // wait for the job to be inited
+ mr.initializeJob(id);
+
+ // Make sure that the master job is 50% completed
+ while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 0.5f) {
+ UtilsForTests.waitFor(10);
+ }
+ assertEquals(numMaps / 2, getCompletedMapCount(rJob));
+
+ enterSafeMode();
+
+ // Signal all the maps to complete
+ UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
+
+ // Signal the reducers to complete
+ UtilsForTests.signalTasks(dfs, fileSys, false, mapSignalFile,
+ redSignalFile);
+
+ // only assigned maps complete in safemode since no more maps may be
+ // assigned
+ Thread.sleep(10000);
+ assertEquals(numMaps / 2 + maxMapTasks, getCompletedMapCount(rJob));
+
+ leaveSafeMode();
+
+ // job completes after leaving safemode
+ UtilsForTests.waitTillDone(jobClient);
+
+ assertTrue(rJob.isSuccessful());
+ }
+
+ private int getCompletedMapCount(RunningJob rJob) throws IOException {
+ TaskCompletionEvent[] taskCompletionEvents = rJob.getTaskCompletionEvents(0);
+ int mapCount = 0;
+ for (TaskCompletionEvent tce : taskCompletionEvents) {
+ if (tce.isMap) {
+ mapCount++;
+ }
+ }
+ return mapCount;
+ }
+
+ private JobConf configureJob(JobConf conf, int maps, int reduces,
+ String mapSignal, String redSignal) throws IOException {
+ UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir, maps,
+ reduces, "test-jt-safemode", mapSignal, redSignal);
+ return conf;
+ }
+
+ private void enterSafeMode() throws IOException {
+ jt.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+ }
+
+ private void leaveSafeMode() throws IOException {
+ jt.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+ }
+}
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java Fri Jun 21 06:37:27 2013
@@ -17,23 +17,24 @@
*/
package org.apache.hadoop.mapred;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapred.TestJobTrackerRestart;
+import org.junit.Test;
+
-import junit.framework.TestCase;
import java.io.*;
-import org.junit.*;
+
/**
* This test checks if the jobtracker can detect and recover a tracker that was
* lost while the jobtracker was down.
*/
-/**UNTIL MAPREDUCE-873 is backported, we will not run recovery manager tests
- */
-@Ignore
-public class TestJobTrackerRestartWithLostTracker extends TestCase {
+
+public class TestJobTrackerRestartWithLostTracker {
final Path testDir = new Path("/jt-restart-lost-tt-testing");
final Path inDir = new Path(testDir, "input");
final Path shareDir = new Path(testDir, "share");
@@ -53,11 +54,14 @@ public class TestJobTrackerRestartWithLo
throws IOException {
FileSystem fileSys = dfs.getFileSystem();
JobConf jobConf = mr.createJobConf();
- int numMaps = 50;
+ int numMaps = 2;
int numReds = 1;
String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
-
+
+ // Enable recovery on restart
+ mr.getJobTrackerConf()
+ .setBoolean("mapred.jobtracker.restart.recover", true);
// Configure the jobs
JobConf job = configureJob(jobConf, numMaps, numReds,
mapSignalFile, redSignalFile);
@@ -84,10 +88,6 @@ public class TestJobTrackerRestartWithLo
// Signal the maps to complete
UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
- // Enable recovery on restart
- mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
- true);
-
// Kill the 2nd tasktracker
mr.stopTaskTracker(1);
@@ -102,6 +102,8 @@ public class TestJobTrackerRestartWithLo
// Wait for the JT to be ready
UtilsForTests.waitForJobTracker(jobClient);
+ // Signal the maps to complete
+ UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
// Signal the reducers to complete
UtilsForTests.signalTasks(dfs, fileSys, false, mapSignalFile,
redSignalFile);
@@ -113,11 +115,9 @@ public class TestJobTrackerRestartWithLo
+ "upon restart",
jobClient.getClusterStatus().getTaskTrackers(), 1);
- // validate the history file
- TestJobHistory.validateJobHistoryFileFormat(id, job, "SUCCESS", true);
- TestJobHistory.validateJobHistoryFileContent(mr, rJob, job);
+ assertTrue("Job should be successful", rJob.isSuccessful());
}
-
+ @Test
public void testRestartWithLostTracker() throws IOException {
String namenode = null;
MiniDFSCluster dfs = null;