You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/06/21 08:37:39 UTC
svn commit: r1495297 [24/46] - in /hadoop/common/branches/branch-1-win: ./
bin/ conf/ ivy/ lib/jdiff/ src/c++/libhdfs/docs/
src/c++/libhdfs/tests/conf/ src/contrib/capacity-scheduler/ivy/
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred...
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Child.java Fri Jun 21 06:37:27 2013
@@ -351,6 +351,11 @@ class Child {
// Do the task-type specific localization
task.localizeConfiguration(jobConf);
+ // Write files required to re-run the task with IsolationRunner
+ if (task.supportIsolationRunner(jobConf)) {
+ task.writeFilesRequiredForRerun(jobConf);
+ }
+
//write the localized task jobconf
LocalDirAllocator lDirAlloc =
new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java Fri Jun 21 06:37:27 2013
@@ -27,9 +27,10 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
import org.apache.hadoop.security.UserGroupInformation;
-class CleanupQueue {
+public class CleanupQueue {
public static final Log LOG =
LogFactory.getLog(CleanupQueue.class);
@@ -56,10 +57,39 @@ class CleanupQueue {
static class PathDeletionContext {
final Path fullPath;// full path of file or dir
final Configuration conf;
+ final UserGroupInformation ugi;
+ final JobID jobIdTokenRenewalToCancel;
public PathDeletionContext(Path fullPath, Configuration conf) {
+ this(fullPath, conf, null, null);
+ }
+
+ public PathDeletionContext(Path fullPath, Configuration conf,
+ UserGroupInformation ugi) {
+ this(fullPath, conf, ugi, null);
+ }
+
+ /**
+ * PathDeletionContext ctor which also allows for a job-delegation token
+ * renewal to be cancelled.
+ *
+ * This is usually used at the end of a job to delete it's final path and
+ * to cancel renewal of it's job-delegation token.
+ *
+ * @param fullPath path to be deleted
+ * @param conf job configuration
+ * @param ugi ugi of the job to be used to delete the path
+ * @param jobIdTokenRenewalToCancel jobId of the job whose job-delegation
+ * token renewal should be cancelled. No
+ * cancellation is attempted if this is
+ * <code>null</code>
+ */
+ public PathDeletionContext(Path fullPath, Configuration conf,
+ UserGroupInformation ugi, JobID jobIdTokenRenewalToCancel) {
this.fullPath = fullPath;
this.conf = conf;
+ this.ugi = ugi;
+ this.jobIdTokenRenewalToCancel = jobIdTokenRenewalToCancel;
}
protected Path getPathForCleanup() {
@@ -72,13 +102,20 @@ class CleanupQueue {
*/
protected void deletePath() throws IOException, InterruptedException {
final Path p = getPathForCleanup();
- UserGroupInformation.getLoginUser().doAs(
+ (ugi == null ? UserGroupInformation.getLoginUser() : ugi).doAs(
new PrivilegedExceptionAction<Object>() {
public Object run() throws IOException {
p.getFileSystem(conf).delete(p, true);
return null;
}
});
+
+ // Cancel renewal of job-delegation token if necessary
+ if (jobIdTokenRenewalToCancel != null &&
+ conf.getBoolean(JobContext.JOB_CANCEL_DELEGATION_TOKEN, true)) {
+ DelegationTokenRenewal.removeDelegationTokenRenewalForJob(
+ jobIdTokenRenewalToCancel);
+ }
}
@Override
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java Fri Jun 21 06:37:27 2013
@@ -329,6 +329,7 @@ public class ClusterStatus implements Wr
*
* @return the size of heap memory used by the <code>JobTracker</code>
*/
+ @Deprecated
public long getUsedMemory() {
return used_memory;
}
@@ -338,6 +339,7 @@ public class ClusterStatus implements Wr
*
* @return the configured size of max heap memory that can be used by the <code>JobTracker</code>
*/
+ @Deprecated
public long getMaxMemory() {
return max_memory;
}
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Counters.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Counters.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Counters.java Fri Jun 21 06:37:27 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.util.StringUtils;
/**
@@ -59,18 +60,22 @@ public class Counters implements Writabl
private static char[] charsToEscape = {GROUP_OPEN, GROUP_CLOSE,
COUNTER_OPEN, COUNTER_CLOSE,
UNIT_OPEN, UNIT_CLOSE};
+ private static final JobConf conf = new JobConf();
/** limit on the size of the name of the group **/
- private static final int GROUP_NAME_LIMIT = 128;
+ private static final int GROUP_NAME_LIMIT =
+ conf.getInt("mapreduce.job.counters.group.name.max", 128);
/** limit on the size of the counter name **/
- private static final int COUNTER_NAME_LIMIT = 64;
+ private static final int COUNTER_NAME_LIMIT =
+ conf.getInt("mapreduce.job.counters.counter.name.max", 64);
- private static final JobConf conf = new JobConf();
/** limit on counters **/
public static int MAX_COUNTER_LIMIT =
- conf.getInt("mapreduce.job.counters.limit", 120);
+ conf.getInt("mapreduce.job.counters.limit", // deprecated in 0.23
+ conf.getInt("mapreduce.job.counters.max", 120));
/** the max groups allowed **/
- static final int MAX_GROUP_LIMIT = 50;
+ public static final int MAX_GROUP_LIMIT =
+ conf.getInt("mapreduce.job.counters.groups.max", 50);
/** the number of current counters**/
private int numCounters = 0;
@@ -163,7 +168,7 @@ public class Counters implements Writabl
Group(String groupName) {
try {
- bundle = getResourceBundle(groupName);
+ bundle = CounterGroup.getResourceBundle(groupName);
}
catch (MissingResourceException neverMind) {
}
@@ -291,7 +296,7 @@ public class Counters implements Writabl
* @deprecated use {@link #getCounter(String)} instead
*/
@Deprecated
- public synchronized Counter getCounter(int id, String name) {
+ public Counter getCounter(int id, String name) {
return getCounterForName(name);
}
@@ -300,23 +305,27 @@ public class Counters implements Writabl
* @param name the internal counter name
* @return the counter
*/
- public synchronized Counter getCounterForName(String name) {
- String shortName = getShortName(name, COUNTER_NAME_LIMIT);
- Counter result = subcounters.get(shortName);
- if (result == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding " + shortName);
- }
- numCounters = (numCounters == 0) ? Counters.this.size(): numCounters;
- if (numCounters >= MAX_COUNTER_LIMIT) {
- throw new CountersExceededException("Error: Exceeded limits on number of counters - "
- + "Counters=" + numCounters + " Limit=" + MAX_COUNTER_LIMIT);
+ public Counter getCounterForName(String name) {
+ synchronized(Counters.this) { // lock ordering: Counters then Group
+ synchronized (this) {
+ String shortName = getShortName(name, COUNTER_NAME_LIMIT);
+ Counter result = subcounters.get(shortName);
+ if (result == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding " + shortName);
+ }
+ numCounters = (numCounters == 0) ? Counters.this.size(): numCounters;
+ if (numCounters >= MAX_COUNTER_LIMIT) {
+ throw new CountersExceededException("Error: Exceeded limits on number of counters - "
+ + "Counters=" + numCounters + " Limit=" + MAX_COUNTER_LIMIT);
+ }
+ result = new Counter(shortName, localize(shortName + ".name", shortName), 0L);
+ subcounters.put(shortName, result);
+ numCounters++;
+ }
+ return result;
}
- result = new Counter(shortName, localize(shortName + ".name", shortName), 0L);
- subcounters.put(shortName, result);
- numCounters++;
}
- return result;
}
/**
@@ -377,15 +386,6 @@ public class Counters implements Writabl
private Map<Enum, Counter> cache = new IdentityHashMap<Enum, Counter>();
/**
- * Returns the specified resource bundle, or throws an exception.
- * @throws MissingResourceException if the bundle isn't found
- */
- private static ResourceBundle getResourceBundle(String enumClassName) {
- String bundleName = enumClassName.replace('$','_');
- return ResourceBundle.getBundle(bundleName);
- }
-
- /**
* Returns the names of all counter classes.
* @return Set of counter names.
*/
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Jun 21 06:37:27 2013
@@ -185,8 +185,23 @@ public class DefaultTaskController exten
String commandFile = writeCommand(cmdLine, rawFs, p).getAbsolutePath();
rawFs.setPermission(p, TaskController.TASK_LAUNCH_SCRIPT_PERMISSION);
- String[] commandArray = Shell.getRunCommand(commandFile, attemptId);
- shExec = new ShellCommandExecutor(commandArray, currentWorkDirectory);
+ if (Shell.WINDOWS) {
+ String[] commandArray = Shell.getRunCommand(commandFile, attemptId);
+ shExec = new ShellCommandExecutor(commandArray, currentWorkDirectory);
+ } else {
+ /*
+ * MAPREDUCE-2374: if another thread fork(2)ed a child process during the
+ * window when writeCommand (above) had taskjvm.sh open for write, that
+ * child process might still have a writeable fd open to the script.
+ *
+ * If we run the script with "bash -c /path/to/taskjvm.sh", then bash
+ * would try to execve(2) the script and get ETXTBSY. Instead, just have
+ * bash interpret the script with "bash /path/to/taskjvm.sh".
+ */
+ shExec = new ShellCommandExecutor(new String[]{
+ "bash", commandFile},
+ currentWorkDirectory);
+ }
shExec.execute();
} catch (Exception e) {
if (shExec == null) {
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/FileInputFormat_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/FileInputFormat_Counter.properties?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/FileInputFormat_Counter.properties (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/FileInputFormat_Counter.properties Fri Jun 21 06:37:27 2013
@@ -1,3 +1,16 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
# ResourceBundle properties file for file-input-format counters
CounterGroupName= File Input Format Counters
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/FileOutputFormat_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/FileOutputFormat_Counter.properties?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/FileOutputFormat_Counter.properties (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/FileOutputFormat_Counter.properties Fri Jun 21 06:37:27 2013
@@ -1,3 +1,16 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
# ResourceBundle properties file for file-output-format counters
CounterGroupName= File Output Format Counters
Added: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/HDFSMonitorThread.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/HDFSMonitorThread.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/HDFSMonitorThread.java (added)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/HDFSMonitorThread.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.mapred.JobTracker.SafeModeAction;
+
+public class HDFSMonitorThread extends Thread {
+
+ public static final Log LOG = LogFactory.getLog(HDFSMonitorThread.class);
+
+ private final JobTracker jt;
+ private final FileSystem fs;
+
+ private final int hdfsMonitorInterval;
+
+ public HDFSMonitorThread(Configuration conf, JobTracker jt, FileSystem fs) {
+ super("JT-HDFS-Monitor-Thread");
+ this.jt = jt;
+ this.fs = fs;
+ this.hdfsMonitorInterval =
+ conf.getInt(
+ JobTracker.JT_HDFS_MONITOR_THREAD_INTERVAL,
+ JobTracker.DEFAULT_JT_HDFS_MONITOR_THREAD_INTERVAL_MS);
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+
+ LOG.info("Starting HDFS Health Monitoring...");
+
+ boolean previouslyHealthy = true;
+ boolean done = false;
+
+ while (!done && !isInterrupted()) {
+
+ boolean currentlyHealthy = DistributedFileSystem.isHealthy(fs.getUri());
+ if (currentlyHealthy != previouslyHealthy) {
+
+ JobTracker.SafeModeAction action;
+ if (currentlyHealthy) {
+ action = SafeModeAction.SAFEMODE_LEAVE;
+ LOG.info("HDFS healthy again, instructing JobTracker to leave " +
+ "'safemode' ...");
+ } else {
+ action = SafeModeAction.SAFEMODE_ENTER;
+ LOG.info("HDFS is unhealthy, instructing JobTracker to enter " +
+ "'safemode' ...");
+ }
+
+ try {
+ if (jt.isInAdminSafeMode()) {
+ // Don't override admin-set safemode
+ LOG.info("JobTracker is in admin-set safemode, not overriding " +
+ "through " + action);
+ previouslyHealthy = currentlyHealthy;
+ } else {
+ previouslyHealthy = !(jt.setSafeModeInternal(action));
+ //safemode => !healthy
+ }
+ } catch (IOException ioe) {
+ LOG.info("Failed to setSafeMode with action " + action, ioe);
+ }
+ }
+
+ try {
+ Thread.sleep(hdfsMonitorInterval);
+ } catch (InterruptedException e) {
+ done = true;
+ }
+ }
+
+ LOG.info("Stoping HDFS Health Monitoring...");
+ }
+
+}
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/IFile.java Fri Jun 21 06:37:27 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -291,7 +292,7 @@ class IFile {
CompressionCodec codec,
Counters.Counter readsCounter) throws IOException {
readRecordsCounter = readsCounter;
- checksumIn = new IFileInputStream(in,length);
+ checksumIn = new IFileInputStream(in,length, conf);
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
this.in = codec.createInputStream(checksumIn, decompressor);
@@ -325,7 +326,8 @@ class IFile {
private int readData(byte[] buf, int off, int len) throws IOException {
int bytesRead = 0;
while (bytesRead < len) {
- int n = in.read(buf, off+bytesRead, len-bytesRead);
+ int n = IOUtils.wrappedReadForCompressedData(in, buf, off + bytesRead,
+ len - bytesRead);
if (n < 0) {
return bytesRead;
}
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java Fri Jun 21 06:37:27 2013
@@ -19,11 +19,20 @@
package org.apache.hadoop.mapred;
import java.io.EOFException;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.HasFileDescriptor;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
import org.apache.hadoop.util.DataChecksum;
/**
* A checksum input stream, used for IFiles.
@@ -32,7 +41,8 @@ import org.apache.hadoop.util.DataChecks
class IFileInputStream extends InputStream {
- private final InputStream in; //The input stream to be verified for checksum.
+ private final InputStream in; //The input stream to be verified for checksum.
+ private final FileDescriptor inFd; // the file descriptor, if it is known
private final long length; //The total length of the input file
private final long dataLength;
private DataChecksum sum;
@@ -40,19 +50,66 @@ class IFileInputStream extends InputStre
private final byte b[] = new byte[1];
private byte csum[] = null;
private int checksumSize;
-
+
+ private ReadaheadRequest curReadahead = null;
+ private ReadaheadPool raPool = ReadaheadPool.getInstance();
+ private boolean readahead;
+ private int readaheadLength;
+
+ /**
+ * Configuration key to enable/disable IFile readahead.
+ */
+ public static final String MAPRED_IFILE_READAHEAD =
+ "mapreduce.ifile.readahead";
+
+ public static final boolean DEFAULT_MAPRED_IFILE_READAHEAD = true;
+
+ /**
+ * Configuration key to set the IFile readahead length in bytes.
+ */
+ public static final String MAPRED_IFILE_READAHEAD_BYTES =
+ "mapreduce.ifile.readahead.bytes";
+
+ public static final int DEFAULT_MAPRED_IFILE_READAHEAD_BYTES =
+ 4 * 1024 * 1024;
+
+ public static final Log LOG = LogFactory.getLog(IFileInputStream.class);
+
/**
* Create a checksum input stream that reads
* @param in The input stream to be verified for checksum.
* @param len The length of the input stream including checksum bytes.
*/
- public IFileInputStream(InputStream in, long len) {
+ public IFileInputStream(InputStream in, long len, Configuration conf) {
this.in = in;
+ this.inFd = getFileDescriptorIfAvail(in);
sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
Integer.MAX_VALUE);
checksumSize = sum.getChecksumSize();
length = len;
dataLength = length - checksumSize;
+
+ conf = (conf != null) ? conf : new Configuration();
+ readahead = conf.getBoolean(MAPRED_IFILE_READAHEAD,
+ DEFAULT_MAPRED_IFILE_READAHEAD);
+ readaheadLength = conf.getInt(MAPRED_IFILE_READAHEAD_BYTES,
+ DEFAULT_MAPRED_IFILE_READAHEAD_BYTES);
+
+ doReadahead();
+ }
+
+ private static FileDescriptor getFileDescriptorIfAvail(InputStream in) {
+ FileDescriptor fd = null;
+ try {
+ if (in instanceof HasFileDescriptor) {
+ fd = ((HasFileDescriptor)in).getFileDescriptor();
+ } else if (in instanceof FileInputStream) {
+ fd = ((FileInputStream)in).getFD();
+ }
+ } catch (IOException e) {
+ LOG.info("Unable to determine FileDescriptor", e);
+ }
+ return fd;
}
/**
@@ -61,6 +118,10 @@ class IFileInputStream extends InputStre
*/
@Override
public void close() throws IOException {
+
+ if (curReadahead != null) {
+ curReadahead.cancel();
+ }
if (currentOffset < dataLength) {
byte[] t = new byte[Math.min((int)
(Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)];
@@ -97,10 +158,21 @@ class IFileInputStream extends InputStre
if (currentOffset >= dataLength) {
return -1;
}
-
+
+ doReadahead();
+
return doRead(b,off,len);
}
+ private void doReadahead() {
+ if (raPool != null && inFd != null && readahead) {
+ curReadahead = raPool.readaheadStream(
+ "ifile", inFd,
+ currentOffset, readaheadLength, dataLength,
+ curReadahead);
+ }
+ }
+
/**
* Read bytes from the stream.
* At EOF, checksum is validated and sent back
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Fri Jun 21 06:37:27 2013
@@ -72,8 +72,9 @@ interface InterTrackerProtocol extends V
* Version 29: Adding available memory and CPU usage information on TT to
* TaskTrackerStatus for MAPREDUCE-1218
* Version 30: Adding disk failure to TaskTrackerStatus for MAPREDUCE-3015
+ * Version 31: Adding version methods for HADOOP-8209
*/
- public static final long versionID = 30L;
+ public static final long versionID = 31L;
public final static int TRACKERS_OK = 0;
public final static int UNKNOWN_TASKTRACKER = 1;
@@ -144,9 +145,13 @@ interface InterTrackerProtocol extends V
*/
public String getSystemDir();
-
/**
- * Returns the buildVersion of the JobTracker
+ * Returns the VersionInfo build version of the JobTracker
*/
public String getBuildVersion() throws IOException;
+
+ /**
+ * Returns the VersionInfo version of the JobTracker
+ */
+ public String getVIVersion() throws IOException;
}
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JSPUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JSPUtil.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JSPUtil.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JSPUtil.java Fri Jun 21 06:37:27 2013
@@ -101,36 +101,43 @@ class JSPUtil {
final JobInProgress job = jt.getJob(jobid);
JobWithViewAccessCheck myJob = new JobWithViewAccessCheck(job);
+ if (!jt.areACLsEnabled() || job == null) {
+ return myJob;
+ }
+
String user = request.getRemoteUser();
- if (user != null && job != null && jt.areACLsEnabled()) {
- final UserGroupInformation ugi =
- UserGroupInformation.createRemoteUser(user);
- try {
- ugi.doAs(new PrivilegedExceptionAction<Void>() {
- public Void run() throws IOException, ServletException {
-
- // checks job view permission
- jt.getACLsManager().checkAccess(job, ugi,
- Operation.VIEW_JOB_DETAILS);
- return null;
- }
- });
- } catch (AccessControlException e) {
- String errMsg = "User " + ugi.getShortUserName() +
- " failed to view " + jobid + "!<br><br>" + e.getMessage() +
- "<hr><a href=\"jobtracker.jsp\">Go back to JobTracker</a><br>";
- JSPUtil.setErrorAndForward(errMsg, request, response);
- myJob.setViewAccess(false);
- } catch (InterruptedException e) {
- String errMsg = " Interrupted while trying to access " + jobid +
- "<hr><a href=\"jobtracker.jsp\">Go back to JobTracker</a><br>";
- JSPUtil.setErrorAndForward(errMsg, request, response);
- myJob.setViewAccess(false);
- }
+ if (user == null) {
+ JSPUtil.setErrorAndForward("Null user", request, response);
+ myJob.setViewAccess(false);
+ return myJob;
+ }
+
+ final UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(user);
+ try {
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ public Void run() throws IOException, ServletException {
+ // checks job view permission
+ jt.getACLsManager().checkAccess(job, ugi,
+ Operation.VIEW_JOB_DETAILS);
+ return null;
+ }
+ });
+ } catch (AccessControlException e) {
+ String errMsg = "User " + ugi.getShortUserName() +
+ " failed to view " + jobid + "!<br><br>" + e.getMessage() +
+ "<hr><a href=\"jobtracker.jsp\">Go back to JobTracker</a><br>";
+ JSPUtil.setErrorAndForward(errMsg, request, response);
+ myJob.setViewAccess(false);
+ } catch (InterruptedException e) {
+ String errMsg = " Interrupted while trying to access " + jobid +
+ "<hr><a href=\"jobtracker.jsp\">Go back to JobTracker</a><br>";
+ JSPUtil.setErrorAndForward(errMsg, request, response);
+ myJob.setViewAccess(false);
}
return myJob;
}
-
+
/**
* Sets error code SC_UNAUTHORIZED in response and forwards to
* error page which contains error message and a back link.
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobClient.java Fri Jun 21 06:37:27 2013
@@ -40,6 +40,7 @@ import java.security.PrivilegedException
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -59,6 +60,10 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapred.Counters.Counter;
@@ -426,9 +431,17 @@ public class JobClient extends Configure
ensureFreshStatus();
return status.getFailureInfo();
}
+
+ @Override
+ public JobStatus getJobStatus() throws IOException {
+ updateStatus();
+ return status;
+ }
}
+ private JobSubmissionProtocol rpcJobSubmitClient;
private JobSubmissionProtocol jobSubmitClient;
+
private Path sysDir = null;
private Path stagingAreaDir = null;
@@ -439,6 +452,15 @@ public class JobClient extends Configure
private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
static int tasklogtimeout;
+ public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY =
+ "mapreduce.jobclient.retry.policy.enabled";
+ public static final boolean MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT =
+ false;
+ public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY =
+ "mapreduce.jobclient.retry.policy.spec";
+ public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
+ "10000,6,60000,10"; //t1,n1,t2,n2,...
+
/**
* Create a job client.
*/
@@ -471,16 +493,72 @@ public class JobClient extends Configure
conf.setNumMapTasks(1);
this.jobSubmitClient = new LocalJobRunner(conf);
} else {
- this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
+ this.rpcJobSubmitClient =
+ createRPCProxy(JobTracker.getAddress(conf), conf);
+ this.jobSubmitClient = createProxy(this.rpcJobSubmitClient, conf);
}
}
private static JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
Configuration conf) throws IOException {
- return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
- JobSubmissionProtocol.versionID, addr,
- UserGroupInformation.getCurrentUser(), conf,
- NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
+
+ JobSubmissionProtocol rpcJobSubmitClient =
+ (JobSubmissionProtocol)RPC.getProxy(
+ JobSubmissionProtocol.class,
+ JobSubmissionProtocol.versionID, addr,
+ UserGroupInformation.getCurrentUser(), conf,
+ NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class),
+ 0,
+ RetryUtils.getMultipleLinearRandomRetry(
+ conf,
+ MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY,
+ MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
+ MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY,
+ MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT
+ ),
+ false);
+
+ return rpcJobSubmitClient;
+ }
+
+ private static JobSubmissionProtocol createProxy(
+ JobSubmissionProtocol rpcJobSubmitClient,
+ Configuration conf) throws IOException {
+
+ /*
+ * Default is to retry on JobTrackerNotYetInitializedException
+ * i.e. wait for JobTracker to get to RUNNING state and for
+ * SafeModeException
+ */
+ @SuppressWarnings("unchecked")
+ RetryPolicy defaultPolicy =
+ RetryUtils.getDefaultRetryPolicy(
+ conf,
+ MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY,
+ MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
+ MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY,
+ MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
+ JobTrackerNotYetInitializedException.class,
+ SafeModeException.class
+ );
+
+ /*
+ * Method specific retry policies for killJob and killTask...
+ *
+ * No retries on any exception including
+ * ConnectionException and SafeModeException
+ */
+ Map<String,RetryPolicy> methodNameToPolicyMap =
+ new HashMap<String,RetryPolicy>();
+ methodNameToPolicyMap.put("killJob", RetryPolicies.TRY_ONCE_THEN_FAIL);
+ methodNameToPolicyMap.put("killTask", RetryPolicies.TRY_ONCE_THEN_FAIL);
+
+ final JobSubmissionProtocol jsp = (JobSubmissionProtocol) RetryProxy.create(
+ JobSubmissionProtocol.class,
+ rpcJobSubmitClient, defaultPolicy, methodNameToPolicyMap);
+ RPC.checkVersion(JobSubmissionProtocol.class,
+ JobSubmissionProtocol.versionID, jsp);
+ return jsp;
}
@InterfaceAudience.Private
@@ -496,7 +574,7 @@ public class JobClient extends Configure
public long renew(Token<?> token, Configuration conf
) throws IOException, InterruptedException {
InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token);
- JobSubmissionProtocol jt = createRPCProxy(addr, conf);
+ JobSubmissionProtocol jt = createProxy(createRPCProxy(addr, conf), conf);
return jt.renewDelegationToken((Token<DelegationTokenIdentifier>) token);
}
@@ -505,7 +583,7 @@ public class JobClient extends Configure
public void cancel(Token<?> token, Configuration conf
) throws IOException, InterruptedException {
InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token);
- JobSubmissionProtocol jt = createRPCProxy(addr, conf);
+ JobSubmissionProtocol jt = createProxy(createRPCProxy(addr, conf), conf);
jt.cancelDelegationToken((Token<DelegationTokenIdentifier>) token);
}
@@ -531,15 +609,16 @@ public class JobClient extends Configure
public JobClient(InetSocketAddress jobTrackAddr,
Configuration conf) throws IOException {
this.ugi = UserGroupInformation.getCurrentUser();
- jobSubmitClient = createRPCProxy(jobTrackAddr, conf);
+ rpcJobSubmitClient = createRPCProxy(jobTrackAddr, conf);
+ jobSubmitClient = createProxy(rpcJobSubmitClient, conf);
}
/**
* Close the <code>JobClient</code>.
*/
public synchronized void close() throws IOException {
- if (!(jobSubmitClient instanceof LocalJobRunner)) {
- RPC.stopProxy(jobSubmitClient);
+ if (!(rpcJobSubmitClient instanceof LocalJobRunner)) {
+ RPC.stopProxy(rpcJobSubmitClient);
}
}
@@ -759,10 +838,9 @@ public class JobClient extends Configure
// First we check whether the cached archives and files are legal.
TrackerDistributedCacheManager.validate(job);
- // set the timestamps of the archives and files
- TrackerDistributedCacheManager.determineTimestamps(job);
- // set the public/private visibility of the archives and files
- TrackerDistributedCacheManager.determineCacheVisibilities(job);
+ // set the timestamps of the archives and files and set the
+ // public/private visibility of the archives and files
+ TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(job);
// get DelegationTokens for cache files
TrackerDistributedCacheManager.getDelegationTokens(job,
job.getCredentials());
@@ -774,12 +852,20 @@ public class JobClient extends Configure
if ("".equals(job.getJobName())){
job.setJobName(new Path(originalJarPath).getName());
}
- Path submitJarFile = JobSubmissionFiles.getJobJar(submitJobDir);
- job.setJar(submitJarFile.toString());
- fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);
- fs.setReplication(submitJarFile, replication);
- fs.setPermission(submitJarFile,
- new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
+ Path originalJarFile = new Path(originalJarPath);
+ URI jobJarURI = originalJarFile.toUri();
+ // If the job jar is already in fs, we don't need to copy it from local fs
+ if (jobJarURI.getScheme() == null || jobJarURI.getAuthority() == null
+ || !(jobJarURI.getScheme().equals(fs.getUri().getScheme())
+ && jobJarURI.getAuthority().equals(
+ fs.getUri().getAuthority()))) {
+ Path submitJarFile = JobSubmissionFiles.getJobJar(submitJobDir);
+ job.setJar(submitJarFile.toString());
+ fs.copyFromLocalFile(originalJarFile, submitJarFile);
+ fs.setReplication(submitJarFile, replication);
+ fs.setPermission(submitJarFile,
+ new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
+ }
} else {
LOG.warn("No job jar file set. User classes may not be found. "+
"See JobConf(Class) or JobConf#setJar(String).");
@@ -909,6 +995,12 @@ public class JobClient extends Configure
FileSystem.create(fs, submitJobFile,
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
+ // removing jobtoken referrals before copying the jobconf to HDFS
+ // as the tasks don't need this setting, actually they may break
+ // because of it if present as the referral will point to a
+ // different job.
+ TokenCache.cleanUpTokenReferral(jobCopy);
+
try {
jobCopy.writeXml(out);
} finally {
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobConf.java Fri Jun 21 06:37:27 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.mapred.lib.Hash
import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.util.ClassUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
@@ -319,6 +320,26 @@ public class JobConf extends Configurati
public static final String MAPRED_REDUCE_TASK_ENV =
"mapred.reduce.child.env";
+ public static final String WORKFLOW_ID = "mapreduce.workflow.id";
+
+ public static final String WORKFLOW_NAME = "mapreduce.workflow.name";
+
+ public static final String WORKFLOW_NODE_NAME =
+ "mapreduce.workflow.node.name";
+
+ public static final String WORKFLOW_ADJACENCY_PREFIX_STRING =
+ "mapreduce.workflow.adjacency.";
+
+ public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
+ "^mapreduce\\.workflow\\.adjacency\\..+";
+
+ public static final String WORKFLOW_TAGS = "mapreduce.workflow.tags";
+
+ public static final String MAPREDUCE_RECOVER_JOB =
+ "mapreduce.job.restart.recover";
+
+ public static final boolean DEFAULT_MAPREDUCE_RECOVER_JOB = true;
+
private Credentials credentials = new Credentials();
/**
@@ -429,7 +450,7 @@ public class JobConf extends Configurati
* @param cls the example class.
*/
public void setJarByClass(Class cls) {
- String jar = findContainingJar(cls);
+ String jar = ClassUtil.findContainingJar(cls);
if (jar != null) {
setJar(jar);
}
@@ -1797,38 +1818,6 @@ public class JobConf extends Configurati
return
(int)(Math.ceil((float)getMemoryForReduceTask() / (float)slotSizePerReduce));
}
-
- /**
- * Find a jar that contains a class of the same name, if any.
- * It will return a jar file, even if that is not the first thing
- * on the class path that has a class with the same name.
- *
- * @param my_class the class to find.
- * @return a jar file that contains the class, or null.
- * @throws IOException
- */
- private static String findContainingJar(Class my_class) {
- ClassLoader loader = my_class.getClassLoader();
- String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
- try {
- for(Enumeration itr = loader.getResources(class_file);
- itr.hasMoreElements();) {
- URL url = (URL) itr.nextElement();
- if ("jar".equals(url.getProtocol())) {
- String toReturn = url.getPath();
- if (toReturn.startsWith("file:")) {
- toReturn = toReturn.substring("file:".length());
- }
- toReturn = URLDecoder.decode(toReturn, "UTF-8");
- return toReturn.replaceAll("!.*$", "");
- }
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return null;
- }
-
/**
* Get the memory required to run a task of this job, in bytes. See
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobHistory.java Fri Jun 21 06:37:27 2013
@@ -119,8 +119,6 @@ public class JobHistory {
public static final int JOB_NAME_TRIM_LENGTH = 50;
private static String JOBTRACKER_UNIQUE_STRING = null;
- private static final String JOBHISTORY_DEBUG_MODE =
- "mapreduce.jobhistory.debug.mode";
private static String LOG_DIR = null;
private static final String SECONDARY_FILE_SUFFIX = ".recover";
private static long jobHistoryBlockSize = 0;
@@ -131,17 +129,15 @@ public class JobHistory {
final static FsPermission HISTORY_FILE_PERMISSION =
FsPermission.createImmutable((short) 0744); // rwxr--r--
private static FileSystem LOGDIR_FS; // log dir filesystem
- private static FileSystem DONEDIR_FS; // Done dir filesystem
+ protected static FileSystem DONEDIR_FS; // Done dir filesystem
private static JobConf jtConf;
- private static Path DONE = null; // folder for completed jobs
+ protected static Path DONE = null; // folder for completed jobs
private static String DONE_BEFORE_SERIAL_TAIL = doneSubdirsBeforeSerialTail();
private static String DONE_LEAF_FILES = DONE_BEFORE_SERIAL_TAIL + "/*";
private static boolean aclsEnabled = false;
static final String CONF_FILE_NAME_SUFFIX = "_conf.xml";
- // XXXXX debug mode -- set this to false for production
- private static boolean DEBUG_MODE;
private static final int SERIAL_NUMBER_DIRECTORY_DIGITS = 6;
private static int SERIAL_NUMBER_LOW_DIGITS;
@@ -225,8 +221,11 @@ public class JobHistory {
void start() {
- executor = new ThreadPoolExecutor(1, 3, 1,
+ executor = new ThreadPoolExecutor(5, 5, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
+ // make core threads to terminate if there has been no work
+ // for the keppalive period.
+ executor.allowCoreThreadTimeOut(true);
}
private FilesHolder getFileHolder(JobID id) {
@@ -368,8 +367,8 @@ public class JobHistory {
timestamp.get(Calendar.YEAR),
// months are 0-based in Calendar, but people will expect January
// to be month #1.
- timestamp.get(DEBUG_MODE ? Calendar.HOUR : Calendar.MONTH) + 1,
- timestamp.get(DEBUG_MODE ? Calendar.MINUTE : Calendar.DAY_OF_MONTH));
+ timestamp.get(Calendar.MONTH) + 1,
+ timestamp.get(Calendar.DAY_OF_MONTH));
dateString = dateString.intern();
@@ -391,9 +390,9 @@ public class JobHistory {
synchronized (existingDoneSubdirs) {
if (existingDoneSubdirs.contains(dir)) {
- if (DEBUG_MODE && !DONEDIR_FS.exists(dir)) {
- System.err.println("JobHistory.maybeMakeSubdirectory -- We believed "
- + dir + " already existed, but it didn't.");
+ if (LOG.isDebugEnabled() && !DONEDIR_FS.exists(dir)) {
+ LOG.error("JobHistory.maybeMakeSubdirectory -- We believed " + dir
+ + " already existed, but it didn't.");
}
return true;
@@ -411,9 +410,9 @@ public class JobHistory {
return false;
} else {
- if (DEBUG_MODE) {
- System.err.println("JobHistory.maybeMakeSubdirectory -- We believed "
- + dir + " didn't already exist, but it did.");
+ if (LOG.isDebugEnabled()) {
+ LOG.error("JobHistory.maybeMakeSubdirectory -- We believed " + dir
+ + " didn't already exist, but it did.");
}
return false;
@@ -476,7 +475,9 @@ public class JobHistory {
ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE,
SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT,
TRACKER_NAME, STATE_STRING, VERSION, MAP_COUNTERS, REDUCE_COUNTERS,
- VIEW_JOB, MODIFY_JOB, JOB_QUEUE, FAIL_REASON
+ VIEW_JOB, MODIFY_JOB, JOB_QUEUE, FAIL_REASON, LOCALITY, AVATAAR,
+ WORKFLOW_ID, WORKFLOW_NAME, WORKFLOW_NODE_NAME, WORKFLOW_ADJACENCIES,
+ WORKFLOW_TAGS
}
/**
@@ -497,8 +498,7 @@ public class JobHistory {
public static void init(JobTracker jobTracker, JobConf conf,
String hostname, long jobTrackerStartTime) throws IOException {
initLogDir(conf);
- DEBUG_MODE = conf.getBoolean(JOBHISTORY_DEBUG_MODE, false);
- SERIAL_NUMBER_LOW_DIGITS = DEBUG_MODE ? 1 : 3;
+ SERIAL_NUMBER_LOW_DIGITS = 3;
SERIAL_NUMBER_FORMAT = ("%0"
+ (SERIAL_NUMBER_DIRECTORY_DIGITS + SERIAL_NUMBER_LOW_DIGITS)
+ "d");
@@ -547,8 +547,9 @@ public class JobHistory {
String doneLocation = conf.
get("mapred.job.tracker.history.completed.location");
if (doneLocation != null) {
- DONE = fs.makeQualified(new Path(doneLocation));
- DONEDIR_FS = fs;
+ Path donePath = new Path(doneLocation);
+ DONEDIR_FS = donePath.getFileSystem(conf);
+ DONE = DONEDIR_FS.makeQualified(donePath);
} else {
if (!setup) {
initLogDir(conf);
@@ -586,6 +587,19 @@ public class JobHistory {
}
fileManager.start();
+
+ HistoryCleaner.cleanupFrequency =
+ conf.getLong("mapreduce.jobhistory.cleaner.interval-ms",
+ HistoryCleaner.DEFAULT_CLEANUP_FREQUENCY);
+ HistoryCleaner.maxAgeOfHistoryFiles =
+ conf.getLong("mapreduce.jobhistory.max-age-ms",
+ HistoryCleaner.DEFAULT_HISTORY_MAX_AGE);
+ LOG.info(String.format("Job History MaxAge is %d ms (%.2f days), " +
+ "Cleanup Frequency is %d ms (%.2f days)",
+ HistoryCleaner.maxAgeOfHistoryFiles,
+ ((float) HistoryCleaner.maxAgeOfHistoryFiles)/HistoryCleaner.ONE_DAY_IN_MS,
+ HistoryCleaner.cleanupFrequency,
+ ((float) HistoryCleaner.cleanupFrequency)/HistoryCleaner.ONE_DAY_IN_MS));
}
/**
@@ -789,6 +803,10 @@ public class JobHistory {
Keys[] keys, String[] values) {
log(writers, recordType, keys, values, null);
}
+
+ static class JobHistoryLogger {
+ static final Log LOG = LogFactory.getLog(JobHistoryLogger.class);
+ }
/**
* Log a number of keys and values with record. the array length of keys and values
@@ -822,15 +840,19 @@ public class JobHistory {
builder.append(DELIMITER);
}
builder.append(LINE_DELIMITER_CHAR);
-
+
+ String logLine = builder.toString();
for (Iterator<PrintWriter> iter = writers.iterator(); iter.hasNext();) {
PrintWriter out = iter.next();
- out.println(builder.toString());
+ out.println(logLine);
if (out.checkError() && id != null) {
LOG.info("Logging failed for job " + id + "removing PrintWriter from FileManager");
iter.remove();
}
}
+ if (recordType != RecordTypes.Meta) {
+ JobHistoryLogger.LOG.debug(logLine);
+ }
}
/**
@@ -1269,6 +1291,34 @@ public class JobHistory {
}
/**
+ * Get the workflow adjacencies from the job conf
+ * The string returned is of the form "key"="value" "key"="value" ...
+ */
+ public static String getWorkflowAdjacencies(Configuration conf) {
+ int prefixLen = JobConf.WORKFLOW_ADJACENCY_PREFIX_STRING.length();
+ Map<String,String> adjacencies =
+ conf.getValByRegex(JobConf.WORKFLOW_ADJACENCY_PREFIX_PATTERN);
+ if (adjacencies.isEmpty())
+ return "";
+ int size = 0;
+ for (Entry<String,String> entry : adjacencies.entrySet()) {
+ int keyLen = entry.getKey().length();
+ size += keyLen - prefixLen;
+ size += entry.getValue().length() + 6;
+ }
+ StringBuilder sb = new StringBuilder(size);
+ for (Entry<String,String> entry : adjacencies.entrySet()) {
+ int keyLen = entry.getKey().length();
+ sb.append("\"");
+ sb.append(escapeString(entry.getKey().substring(prefixLen, keyLen)));
+ sb.append("\"=\"");
+ sb.append(escapeString(entry.getValue()));
+ sb.append("\" ");
+ }
+ return sb.toString();
+ }
+
+ /**
* Get the job history file path given the history filename
*/
public static Path getJobHistoryLogLocation(String logFileName)
@@ -1390,17 +1440,16 @@ public class JobHistory {
FileStatus[] statuses = null;
if (dir == DONE) {
- final String snDirectoryComponent
- = serialNumberDirectoryComponent(id);
-
final String scanTail
= (DONE_BEFORE_SERIAL_TAIL
+ "/" + serialNumberDirectoryComponent(id));
- if (DEBUG_MODE) {
- System.err.println("JobHistory.getJobHistoryFileName DONE dir: scanning " + scanTail);
-
- (new IOException("debug exception")).printStackTrace(System.err);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("JobHistory.getJobHistoryFileName DONE dir: scanning "
+ + scanTail);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(Thread.currentThread().getStackTrace());
+ }
}
statuses = localGlobber(fs, DONE, scanTail, filter);
@@ -1410,9 +1459,6 @@ public class JobHistory {
String filename = null;
if (statuses == null || statuses.length == 0) {
- if (DEBUG_MODE) {
- System.err.println("Nothing to recover for job " + id);
- }
LOG.info("Nothing to recover for job " + id);
} else {
// return filename considering that fact the name can be a
@@ -1735,11 +1781,21 @@ public class JobHistory {
new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER,
Keys.SUBMIT_TIME, Keys.JOBCONF,
Keys.VIEW_JOB, Keys.MODIFY_JOB,
- Keys.JOB_QUEUE},
+ Keys.JOB_QUEUE, Keys.WORKFLOW_ID,
+ Keys.WORKFLOW_NAME, Keys.WORKFLOW_NODE_NAME,
+ Keys.WORKFLOW_ADJACENCIES,
+ Keys.WORKFLOW_TAGS},
new String[]{jobId.toString(), jobName, user,
String.valueOf(submitTime) , jobConfPath,
viewJobACL, modifyJobACL,
- jobConf.getQueueName()}, jobId
+ jobConf.getQueueName(),
+ jobConf.get(JobConf.WORKFLOW_ID, ""),
+ jobConf.get(JobConf.WORKFLOW_NAME, ""),
+ jobConf.get(JobConf.WORKFLOW_NODE_NAME, ""),
+ getWorkflowAdjacencies(jobConf),
+ jobConf.get(JobConf.WORKFLOW_TAGS, ""),
+ },
+ jobId
);
}catch(IOException e){
@@ -2140,7 +2196,14 @@ public class JobHistory {
public static void logStarted(TaskAttemptID taskAttemptId, long startTime, String hostName){
logStarted(taskAttemptId, startTime, hostName, -1, Values.MAP.name());
}
-
+
+ @Deprecated
+ public static void logStarted(TaskAttemptID taskAttemptId, long startTime,
+ String trackerName, int httpPort, String taskType) {
+ logStarted(taskAttemptId, startTime, trackerName, httpPort, taskType,
+ Locality.OFF_SWITCH, Avataar.VIRGIN);
+ }
+
/**
* Log start time of this map task attempt.
*
@@ -2148,25 +2211,31 @@ public class JobHistory {
* @param startTime start time of task attempt as reported by task tracker.
* @param trackerName name of the tracker executing the task attempt.
* @param httpPort http port of the task tracker executing the task attempt
- * @param taskType Whether the attempt is cleanup or setup or map
+ * @param taskType Whether the attempt is cleanup or setup or map
+ * @param locality the data locality of the task attempt
+ * @param Avataar the avataar of the task attempt
*/
public static void logStarted(TaskAttemptID taskAttemptId, long startTime,
String trackerName, int httpPort,
- String taskType) {
+ String taskType,
+ Locality locality, Avataar avataar) {
JobID id = taskAttemptId.getJobID();
ArrayList<PrintWriter> writer = fileManager.getWriters(id);
if (null != writer){
JobHistory.log(writer, RecordTypes.MapAttempt,
new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
- Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
- Keys.TRACKER_NAME, Keys.HTTP_PORT},
+ Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
+ Keys.TRACKER_NAME, Keys.HTTP_PORT,
+ Keys.LOCALITY, Keys.AVATAAR},
new String[]{taskType,
taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(),
String.valueOf(startTime), trackerName,
- httpPort == -1 ? "" :
- String.valueOf(httpPort)}, id);
+ httpPort == -1 ? "" : String.valueOf(httpPort),
+ locality.toString(), avataar.toString()},
+ id
+ );
}
}
@@ -2328,7 +2397,15 @@ public class JobHistory {
long startTime, String hostName){
logStarted(taskAttemptId, startTime, hostName, -1, Values.REDUCE.name());
}
-
+
+ @Deprecated
+ public static void logStarted(TaskAttemptID taskAttemptId,
+ long startTime, String trackerName,
+ int httpPort,
+ String taskType) {
+ logStarted(taskAttemptId, startTime, trackerName, httpPort, taskType,
+ Locality.OFF_SWITCH, Avataar.VIRGIN);
+ }
/**
* Log start time of Reduce task attempt.
*
@@ -2336,27 +2413,33 @@ public class JobHistory {
* @param startTime start time
* @param trackerName tracker name
* @param httpPort the http port of the tracker executing the task attempt
- * @param taskType Whether the attempt is cleanup or setup or reduce
+ * @param taskType Whether the attempt is cleanup or setup or reduce
+ * @param locality the data locality of the task attempt
+ * @param Avataar the avataar of the task attempt
*/
public static void logStarted(TaskAttemptID taskAttemptId,
long startTime, String trackerName,
int httpPort,
- String taskType) {
- JobID id = taskAttemptId.getJobID();
- ArrayList<PrintWriter> writer = fileManager.getWriters(id);
-
- if (null != writer){
- JobHistory.log(writer, RecordTypes.ReduceAttempt,
- new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
- Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
- Keys.TRACKER_NAME, Keys.HTTP_PORT},
- new String[]{taskType,
- taskAttemptId.getTaskID().toString(),
- taskAttemptId.toString(),
- String.valueOf(startTime), trackerName,
- httpPort == -1 ? "" :
- String.valueOf(httpPort)}, id);
- }
+ String taskType,
+ Locality locality, Avataar avataar) {
+ JobID id = taskAttemptId.getJobID();
+ ArrayList<PrintWriter> writer = fileManager.getWriters(id);
+
+ if (null != writer){
+ JobHistory.log(writer, RecordTypes.ReduceAttempt,
+ new Keys[] {Keys.TASK_TYPE, Keys.TASKID,
+ Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
+ Keys.TRACKER_NAME, Keys.HTTP_PORT,
+ Keys.LOCALITY, Keys.AVATAAR},
+ new String[]{taskType,
+ taskAttemptId.getTaskID().toString(),
+ taskAttemptId.toString(),
+ String.valueOf(startTime), trackerName,
+ httpPort == -1 ? "" :
+ String.valueOf(httpPort),
+ locality.toString(), avataar.toString()},
+ id);
+ }
}
/**
@@ -2521,26 +2604,26 @@ public class JobHistory {
*/
public void handle(RecordTypes recType, Map<Keys, String> values) throws IOException;
}
-
- static long directoryTime(String year, String seg2, String seg3) {
- // set to current time. In debug mode, this is where the month
- // and day get set.
+
+ /**
+ * Returns the time in milliseconds, truncated to the day.
+ */
+ static long directoryTime(String year, String month, String day) {
Calendar result = Calendar.getInstance();
- // canonicalize by filling in unset fields
- result.setTimeInMillis(System.currentTimeMillis());
+ result.clear();
result.set(Calendar.YEAR, Integer.parseInt(year));
// months are 0-based in Calendar, but people will expect January
// to be month #1 . Therefore the number is bumped before we make the
// directory name and must be debumped to seek the time.
- result.set(DEBUG_MODE ? Calendar.HOUR : Calendar.MONTH,
- Integer.parseInt(seg2) - 1);
+ result.set(Calendar.MONTH, Integer.parseInt(month) - 1);
- result.set(DEBUG_MODE ? Calendar.MINUTE : Calendar.DAY_OF_MONTH,
- Integer.parseInt(seg3));
-
- return result.getTimeInMillis();
+ result.set(Calendar.DAY_OF_MONTH, Integer.parseInt(day));
+
+ // truncate to day granularity
+ long timeInMillis = result.getTimeInMillis();
+ return timeInMillis;
}
/**
@@ -2551,10 +2634,10 @@ public class JobHistory {
*/
public static class HistoryCleaner implements Runnable {
static final long ONE_DAY_IN_MS = 24 * 60 * 60 * 1000L;
- static final long DIRECTORY_LIFE_IN_MS
- = DEBUG_MODE ? 20 * 60 * 1000L : 30 * ONE_DAY_IN_MS;
- static final long RUN_INTERVAL
- = DEBUG_MODE ? 10L * 60L * 1000L : ONE_DAY_IN_MS;
+ static final long DEFAULT_HISTORY_MAX_AGE = 30 * ONE_DAY_IN_MS;
+ static final long DEFAULT_CLEANUP_FREQUENCY = ONE_DAY_IN_MS;
+ static long cleanupFrequency = DEFAULT_CLEANUP_FREQUENCY;
+ static long maxAgeOfHistoryFiles = DEFAULT_HISTORY_MAX_AGE;
private long now;
private static final AtomicBoolean isRunning = new AtomicBoolean(false);
private static long lastRan = 0;
@@ -2571,12 +2654,15 @@ public class JobHistory {
}
now = System.currentTimeMillis();
// clean history only once a day at max
- if (lastRan != 0 && (now - lastRan) < RUN_INTERVAL) {
+ if (lastRan != 0 && (now - lastRan) < cleanupFrequency) {
isRunning.set(false);
return;
}
lastRan = now;
-
+ clean(now);
+ }
+
+ public void clean(long now) {
Set<String> deletedPathnames = new HashSet<String>();
// XXXXX debug code
@@ -2587,62 +2673,73 @@ public class JobHistory {
Path[] datedDirectories
= FileUtil.stat2Paths(localGlobber(DONEDIR_FS, DONE,
DONE_BEFORE_SERIAL_TAIL, null));
- // find directories older than 30 days
+
+ // any file with a timestamp earlier than cutoff should be deleted
+ long cutoff = now - maxAgeOfHistoryFiles;
+ Calendar cutoffDay = Calendar.getInstance();
+ cutoffDay.setTimeInMillis(cutoff);
+ cutoffDay.set(Calendar.HOUR_OF_DAY, 0);
+ cutoffDay.set(Calendar.MINUTE, 0);
+ cutoffDay.set(Calendar.SECOND, 0);
+ cutoffDay.set(Calendar.MILLISECOND, 0);
+
+ // find directories older than the maximum age
for (int i = 0; i < datedDirectories.length; ++i) {
String thisDir = datedDirectories[i].toString();
Matcher pathMatcher = parseDirectory.matcher(thisDir);
if (pathMatcher.matches()) {
- long dirTime = directoryTime(pathMatcher.group(1),
+ long dirDay = directoryTime(pathMatcher.group(1),
pathMatcher.group(2),
pathMatcher.group(3));
- if (DEBUG_MODE) {
- System.err.println("HistoryCleaner.run just parsed " + thisDir
- + " as year/month/day = " + pathMatcher.group(1)
- + "/" + pathMatcher.group(2) + "/"
- + pathMatcher.group(3));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("HistoryCleaner.run just parsed " + thisDir
+ + " as year/month/day = " + pathMatcher.group(1) + "/"
+ + pathMatcher.group(2) + "/" + pathMatcher.group(3));
}
-
- if (dirTime < now - DIRECTORY_LIFE_IN_MS) {
-
- if (DEBUG_MODE) {
- Calendar then = Calendar.getInstance();
- then.setTimeInMillis(dirTime);
+
+ if (dirDay <= cutoffDay.getTimeInMillis()) {
+ if (LOG.isDebugEnabled()) {
Calendar nnow = Calendar.getInstance();
nnow.setTimeInMillis(now);
+ Calendar then = Calendar.getInstance();
+ then.setTimeInMillis(dirDay);
- System.err.println("HistoryCleaner.run directory: " + thisDir
- + " because its time is " + then
- + " but it's now " + nnow);
- System.err.println("then = " + dirTime);
- System.err.println("now = " + now);
+ LOG.debug("HistoryCleaner.run directory: " + thisDir
+ + " because its time is " + then + " but it's now " + nnow);
}
+ }
+
+ // if dirDay is cutoffDay, some files may be old enough and others not
+ if (dirDay == cutoffDay.getTimeInMillis()) {
+ // remove old enough files in the directory
+ FileStatus[] possibleDeletees = DONEDIR_FS.listStatus(datedDirectories[i]);
+
+ for (int j = 0; j < possibleDeletees.length; ++j) {
+ if (possibleDeletees[j].getModificationTime() < now -
+ maxAgeOfHistoryFiles) {
+ Path deletee = possibleDeletees[j].getPath();
+ if (LOG.isDebugEnabled() && !printedOneDeletee) {
+ LOG.debug("HistoryCleaner.run deletee: "
+ + deletee.toString());
+ printedOneDeletee = true;
+ }
- // remove every file in the directory and save the name
- // so we can remove it from jobHistoryFileMap
- Path[] deletees
- = FileUtil.stat2Paths(localGlobber(DONEDIR_FS,
- datedDirectories[i],
- "/*/*", // sn + individual files
- null));
-
- for (int j = 0; j < deletees.length; ++j) {
-
- if (DEBUG_MODE && !printedOneDeletee) {
- System.err.println("HistoryCleaner.run deletee: " + deletees[j].toString());
- printedOneDeletee = true;
- }
-
- DONEDIR_FS.delete(deletees[j]);
- deletedPathnames.add(deletees[j].toString());
+ DONEDIR_FS.delete(deletee);
+ deletedPathnames.add(deletee.toString());
+ }
}
+ }
+
+ // if the directory is older than cutoffDay, we can flat out
+ // delete it because all the files in it are old enough
+ if (dirDay < cutoffDay.getTimeInMillis()) {
synchronized (existingDoneSubdirs) {
- if (!existingDoneSubdirs.contains(datedDirectories[i]))
- {
- LOG.warn("JobHistory: existingDoneSubdirs doesn't contain "
- + datedDirectories[i] + ", but should.");
- }
+ if (!existingDoneSubdirs.contains(datedDirectories[i])) {
+ LOG.warn("JobHistory: existingDoneSubdirs doesn't contain "
+ + datedDirectories[i] + ", but should.");
+ }
DONEDIR_FS.delete(datedDirectories[i], true);
existingDoneSubdirs.remove(datedDirectories[i]);
}
@@ -2657,8 +2754,8 @@ public class JobHistory {
while (it.hasNext()) {
MovedFileInfo info = it.next().getValue();
- if (DEBUG_MODE && !printedOneMovedFile) {
- System.err.println("HistoryCleaner.run a moved file: " + info.historyFile);
+ if (LOG.isDebugEnabled() && !printedOneMovedFile) {
+ LOG.debug("HistoryCleaner.run a moved file: " + info.historyFile);
printedOneMovedFile = true;
}