You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2009/10/27 16:44:06 UTC
svn commit: r830230 [7/9] - in /hadoop/mapreduce/branches/HDFS-641: ./
.eclipse.templates/ conf/ ivy/ lib/ src/c++/ src/contrib/
src/contrib/capacity-scheduler/
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-sche...
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Tue Oct 27 15:43:58 2009
@@ -20,8 +20,11 @@
import java.io.File;
import java.io.IOException;
import java.net.URI;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import java.util.Random;
+import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
@@ -32,6 +35,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.RunJar;
@@ -59,9 +63,17 @@
LogFactory.getLog(TrackerDistributedCacheManager.class);
private final LocalFileSystem localFs;
+
+ private LocalDirAllocator lDirAllocator;
+
+ private Configuration trackerConf;
+
+ private Random random = new Random();
public TrackerDistributedCacheManager(Configuration conf) throws IOException {
this.localFs = FileSystem.getLocal(conf);
+ this.trackerConf = conf;
+ this.lDirAllocator = new LocalDirAllocator(TTConfig.LOCAL_DIR);
}
/**
@@ -71,7 +83,7 @@
* @param cache the cache to be localized, this should be specified as
* new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
* @param conf The Configuration file which contains the filesystem
- * @param baseDir The base cache Dir where you wnat to localize the
+ * @param subDir The base cache subDir where you want to localize the
* files/archives
* @param fileStatus The file status on the dfs.
* @param isArchive if the cache is an archive or a file. In case it is an
@@ -94,35 +106,55 @@
* @throws IOException
*/
Path getLocalCache(URI cache, Configuration conf,
- Path baseDir, FileStatus fileStatus,
+ String subDir, FileStatus fileStatus,
boolean isArchive, long confFileStamp,
Path currentWorkDir, boolean honorSymLinkConf)
throws IOException {
- String cacheId = makeRelative(cache, conf);
+ String key = getKey(cache, conf, confFileStamp);
CacheStatus lcacheStatus;
- Path localizedPath;
+ Path localizedPath = null;
synchronized (cachedArchives) {
- lcacheStatus = cachedArchives.get(cacheId);
+ lcacheStatus = cachedArchives.get(key);
if (lcacheStatus == null) {
// was never localized
- lcacheStatus = new CacheStatus(baseDir,
- new Path(baseDir, new Path(cacheId)));
- cachedArchives.put(cacheId, lcacheStatus);
- }
-
- synchronized (lcacheStatus) {
+ String cachePath = new Path (subDir,
+ new Path(String.valueOf(random.nextLong()),
+ makeRelative(cache, conf))).toString();
+ Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
+ fileStatus.getLen(), trackerConf);
+ lcacheStatus = new CacheStatus(
+ new Path(localPath.toString().replace(cachePath, "")), localPath);
+ cachedArchives.put(key, lcacheStatus);
+ }
+
+ //mark the cache for use.
+ lcacheStatus.refcount++;
+ }
+
+ // do the localization, after releasing the global lock
+ synchronized (lcacheStatus) {
+ if (!lcacheStatus.isInited()) {
localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
- fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
- lcacheStatus.refcount++;
+ fileStatus, isArchive);
+ lcacheStatus.initComplete();
+ } else {
+ localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
+ lcacheStatus, fileStatus, isArchive);
}
+ createSymlink(conf, cache, lcacheStatus, isArchive,
+ currentWorkDir, honorSymLinkConf);
}
// try deleting stuff if you can
long size = 0;
- synchronized (baseDirSize) {
- Long get = baseDirSize.get(baseDir);
- if ( get != null ) {
- size = get.longValue();
+ synchronized (lcacheStatus) {
+ synchronized (baseDirSize) {
+ Long get = baseDirSize.get(lcacheStatus.getBaseDir());
+ if ( get != null ) {
+ size = get.longValue();
+ } else {
+ LOG.warn("Cannot find size of baseDir: " + lcacheStatus.getBaseDir());
+ }
}
}
// setting the cache size to a default of 10GB
@@ -142,40 +174,58 @@
* is contained in.
* @throws IOException
*/
- void releaseCache(URI cache, Configuration conf)
+ void releaseCache(URI cache, Configuration conf, long timeStamp)
throws IOException {
- String cacheId = makeRelative(cache, conf);
+ String key = getKey(cache, conf, timeStamp);
synchronized (cachedArchives) {
- CacheStatus lcacheStatus = cachedArchives.get(cacheId);
- if (lcacheStatus == null)
+ CacheStatus lcacheStatus = cachedArchives.get(key);
+ if (lcacheStatus == null) {
+ LOG.warn("Cannot find localized cache: " + cache +
+ " (key: " + key + ") in releaseCache!");
return;
- synchronized (lcacheStatus) {
- lcacheStatus.refcount--;
}
+
+ // decrement ref count
+ lcacheStatus.refcount--;
}
}
// To delete the caches which have a refcount of zero
private void deleteCache(Configuration conf) throws IOException {
+ Set<CacheStatus> deleteSet = new HashSet<CacheStatus>();
// try deleting cache Status with refcount of zero
synchronized (cachedArchives) {
for (Iterator<String> it = cachedArchives.keySet().iterator();
it.hasNext();) {
String cacheId = it.next();
CacheStatus lcacheStatus = cachedArchives.get(cacheId);
- synchronized (lcacheStatus) {
- if (lcacheStatus.refcount == 0) {
- // delete this cache entry
- FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
- synchronized (baseDirSize) {
- Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
- if ( dirSize != null ) {
- dirSize -= lcacheStatus.size;
- baseDirSize.put(lcacheStatus.baseDir, dirSize);
- }
- }
- it.remove();
+
+ // if reference count is zero
+ // mark the cache for deletion
+ if (lcacheStatus.refcount == 0) {
+ // delete this cache entry from the global list
+ // and mark the localized file for deletion
+ deleteSet.add(lcacheStatus);
+ it.remove();
+ }
+ }
+ }
+
+ // do the deletion, after releasing the global lock
+ for (CacheStatus lcacheStatus : deleteSet) {
+ synchronized (lcacheStatus) {
+ FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
+ LOG.info("Deleted path " + lcacheStatus.localLoadPath);
+ // decrement the size of the cache from baseDirSize
+ synchronized (baseDirSize) {
+ Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
+ if ( dirSize != null ) {
+ dirSize -= lcacheStatus.size;
+ baseDirSize.put(lcacheStatus.baseDir, dirSize);
+ } else {
+ LOG.warn("Cannot find record of the baseDir: " +
+ lcacheStatus.baseDir + " during delete!");
}
}
}
@@ -208,6 +258,11 @@
return path;
}
+ String getKey(URI cache, Configuration conf, long timeStamp)
+ throws IOException {
+ return makeRelative(cache, conf) + String.valueOf(timeStamp);
+ }
+
/**
* Returns mtime of a given cache file on hdfs.
*
@@ -224,144 +279,115 @@
return fileSystem.getFileStatus(filePath).getModificationTime();
}
- private Path cacheFilePath(Path p) {
- return new Path(p, p.getName());
- }
+ private Path checkCacheStatusValidity(Configuration conf,
+ URI cache, long confFileStamp,
+ CacheStatus cacheStatus,
+ FileStatus fileStatus,
+ boolean isArchive
+ ) throws IOException {
+ FileSystem fs = FileSystem.get(cache, conf);
+ // Has to be
+ if (!ifExistsAndFresh(conf, fs, cache, confFileStamp,
+ cacheStatus, fileStatus)) {
+ throw new IOException("Stale cache file: " + cacheStatus.localLoadPath +
+ " for cache-file: " + cache);
+ }
- // the method which actually copies the caches locally and unjars/unzips them
- // and does chmod for the files
- private Path localizeCache(Configuration conf,
- URI cache, long confFileStamp,
- CacheStatus cacheStatus,
- FileStatus fileStatus,
- boolean isArchive,
- Path currentWorkDir,
- boolean honorSymLinkConf)
- throws IOException {
+ LOG.info(String.format("Using existing cache of %s->%s",
+ cache.toString(), cacheStatus.localLoadPath));
+ return cacheStatus.localLoadPath;
+ }
+
+ private void createSymlink(Configuration conf, URI cache,
+ CacheStatus cacheStatus, boolean isArchive,
+ Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
boolean doSymlink = honorSymLinkConf && DistributedCache.getSymlink(conf);
if(cache.getFragment() == null) {
doSymlink = false;
}
- FileSystem fs = FileSystem.get(cache, conf);
String link =
currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
File flink = new File(link);
- if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
- cacheStatus, fileStatus)) {
- LOG.info(String.format("Using existing cache of %s->%s",
- cache.toString(), cacheStatus.localLoadPath));
- if (isArchive) {
- if (doSymlink){
- if (!flink.exists())
- FileUtil.symLink(cacheStatus.localLoadPath.toString(),
- link);
- }
-
- return cacheStatus.localLoadPath;
- }
- else {
- if (doSymlink){
- if (!flink.exists())
- FileUtil.symLink(
- cacheFilePath(cacheStatus.localLoadPath).toString(), link);
- }
- return cacheFilePath(cacheStatus.localLoadPath);
+ if (doSymlink){
+ if (!flink.exists()) {
+ FileUtil.symLink(cacheStatus.localLoadPath.toString(), link);
}
+ }
+ }
+
+ // the method which actually copies the caches locally and unjars/unzips them
+ // and does chmod for the files
+ private Path localizeCache(Configuration conf,
+ URI cache, long confFileStamp,
+ CacheStatus cacheStatus,
+ FileStatus fileStatus,
+ boolean isArchive)
+ throws IOException {
+ FileSystem fs = FileSystem.get(cache, conf);
+ FileSystem localFs = FileSystem.getLocal(conf);
+ Path parchive = null;
+ if (isArchive) {
+ parchive = new Path(cacheStatus.localLoadPath,
+ new Path(cacheStatus.localLoadPath.getName()));
} else {
+ parchive = cacheStatus.localLoadPath;
+ }
- // remove the old archive
- // if the old archive cannot be removed since it is being used by another
- // job
- // return null
- if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
- throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
- + " is in use and cannot be refreshed");
-
- FileSystem localFs = FileSystem.getLocal(conf);
- localFs.delete(cacheStatus.localLoadPath, true);
- synchronized (baseDirSize) {
- Long dirSize = baseDirSize.get(cacheStatus.baseDir);
- if ( dirSize != null ) {
- dirSize -= cacheStatus.size;
- baseDirSize.put(cacheStatus.baseDir, dirSize);
- }
- }
- Path parchive = new Path(cacheStatus.localLoadPath,
- new Path(cacheStatus.localLoadPath.getName()));
-
- if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
- throw new IOException("Mkdirs failed to create directory " +
- cacheStatus.localLoadPath.toString());
- }
-
- String cacheId = cache.getPath();
- fs.copyToLocalFile(new Path(cacheId), parchive);
- if (isArchive) {
- String tmpArchive = parchive.toString().toLowerCase();
- File srcFile = new File(parchive.toString());
- File destDir = new File(parchive.getParent().toString());
- LOG.info(String.format("Extracting %s to %s",
- srcFile.toString(), destDir.toString()));
- if (tmpArchive.endsWith(".jar")) {
- RunJar.unJar(srcFile, destDir);
- } else if (tmpArchive.endsWith(".zip")) {
- FileUtil.unZip(srcFile, destDir);
- } else if (isTarFile(tmpArchive)) {
- FileUtil.unTar(srcFile, destDir);
- } else {
- LOG.warn(String.format(
+ if (!localFs.mkdirs(parchive.getParent())) {
+ throw new IOException("Mkdirs failed to create directory " +
+ cacheStatus.localLoadPath.toString());
+ }
+
+ String cacheId = cache.getPath();
+ fs.copyToLocalFile(new Path(cacheId), parchive);
+ if (isArchive) {
+ String tmpArchive = parchive.toString().toLowerCase();
+ File srcFile = new File(parchive.toString());
+ File destDir = new File(parchive.getParent().toString());
+ LOG.info(String.format("Extracting %s to %s",
+ srcFile.toString(), destDir.toString()));
+ if (tmpArchive.endsWith(".jar")) {
+ RunJar.unJar(srcFile, destDir);
+ } else if (tmpArchive.endsWith(".zip")) {
+ FileUtil.unZip(srcFile, destDir);
+ } else if (isTarFile(tmpArchive)) {
+ FileUtil.unTar(srcFile, destDir);
+ } else {
+ LOG.warn(String.format(
"Cache file %s specified as archive, but not valid extension.",
srcFile.toString()));
- // else will not do anyhting
- // and copy the file into the dir as it is
- }
+ // else will not do anyhting
+ // and copy the file into the dir as it is
}
+ }
- long cacheSize =
- FileUtil.getDU(new File(parchive.getParent().toString()));
- cacheStatus.size = cacheSize;
- synchronized (baseDirSize) {
- Long dirSize = baseDirSize.get(cacheStatus.baseDir);
- if( dirSize == null ) {
- dirSize = Long.valueOf(cacheSize);
- } else {
- dirSize += cacheSize;
- }
- baseDirSize.put(cacheStatus.baseDir, dirSize);
+ long cacheSize =
+ FileUtil.getDU(new File(parchive.getParent().toString()));
+ cacheStatus.size = cacheSize;
+ synchronized (baseDirSize) {
+ Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+ if( dirSize == null ) {
+ dirSize = Long.valueOf(cacheSize);
+ } else {
+ dirSize += cacheSize;
}
+ baseDirSize.put(cacheStatus.baseDir, dirSize);
+ }
- // do chmod here
- try {
- //Setting recursive permission to grant everyone read and execute
- FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
- } catch(InterruptedException e) {
+ // do chmod here
+ try {
+ //Setting recursive permission to grant everyone read and execute
+ FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
+ } catch(InterruptedException e) {
LOG.warn("Exception in chmod" + e.toString());
- }
-
- // update cacheStatus to reflect the newly cached file
- cacheStatus.currentStatus = true;
- cacheStatus.mtime = getTimestamp(conf, cache);
-
- LOG.info(String.format("Cached %s as %s",
- cache.toString(), cacheStatus.localLoadPath));
}
- if (isArchive){
- if (doSymlink){
- if (!flink.exists())
- FileUtil.symLink(cacheStatus.localLoadPath.toString(),
- link);
- }
- return cacheStatus.localLoadPath;
- }
- else {
- if (doSymlink){
- if (!flink.exists())
- FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(),
- link);
- }
- return cacheFilePath(cacheStatus.localLoadPath);
- }
+ // update cacheStatus to reflect the newly cached file
+ cacheStatus.mtime = getTimestamp(conf, cache);
+
+ LOG.info(String.format("Cached %s as %s",
+ cache.toString(), cacheStatus.localLoadPath));
+ return cacheStatus.localLoadPath;
}
private static boolean isTarFile(String filename) {
@@ -375,28 +401,22 @@
CacheStatus lcacheStatus,
FileStatus fileStatus)
throws IOException {
- // check for existence of the cache
- if (lcacheStatus.currentStatus == false) {
- return false;
+ long dfsFileStamp;
+ if (fileStatus != null) {
+ dfsFileStamp = fileStatus.getModificationTime();
} else {
- long dfsFileStamp;
- if (fileStatus != null) {
- dfsFileStamp = fileStatus.getModificationTime();
- } else {
- dfsFileStamp = getTimestamp(conf, cache);
- }
+ dfsFileStamp = getTimestamp(conf, cache);
+ }
- // ensure that the file on hdfs hasn't been modified since the job started
- if (dfsFileStamp != confFileStamp) {
- LOG.fatal("File: " + cache + " has changed on HDFS since job started");
- throw new IOException("File: " + cache +
- " has changed on HDFS since job started");
- }
+ // ensure that the file on hdfs hasn't been modified since the job started
+ if (dfsFileStamp != confFileStamp) {
+ LOG.fatal("File: " + cache + " has changed on HDFS since job started");
+ throw new IOException("File: " + cache +
+ " has changed on HDFS since job started");
+ }
- if (dfsFileStamp != lcacheStatus.mtime) {
- // needs refreshing
- return false;
- }
+ if (dfsFileStamp != lcacheStatus.mtime) {
+ return false;
}
return true;
@@ -437,9 +457,6 @@
}
private static class CacheStatus {
- // false, not loaded yet, true is loaded
- boolean currentStatus;
-
// the local load path of this cache
Path localLoadPath;
@@ -455,15 +472,31 @@
// the cache-file modification time
long mtime;
+ // is it initialized ?
+ boolean inited = false;
+
public CacheStatus(Path baseDir, Path localLoadPath) {
super();
- this.currentStatus = false;
this.localLoadPath = localLoadPath;
this.refcount = 0;
this.mtime = -1;
this.baseDir = baseDir;
this.size = 0;
}
+
+ Path getBaseDir(){
+ return this.baseDir;
+ }
+
+ // mark it as initialized
+ void initComplete() {
+ inited = true;
+ }
+
+ // is it initialized?
+ boolean isInited() {
+ return inited;
+ }
}
/**
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java Tue Oct 27 15:43:58 2009
@@ -18,10 +18,12 @@
package org.apache.hadoop.mapreduce.jobhistory;
+import java.io.Closeable;
+import java.io.DataInputStream;
import java.io.IOException;
import java.io.EOFException;
+import java.io.StringBufferInputStream;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Counter;
@@ -29,15 +31,16 @@
import org.apache.hadoop.mapreduce.Counters;
import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
import org.apache.avro.io.Decoder;
-import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;
-public class EventReader {
+public class EventReader implements Closeable {
private String version;
private Schema schema;
- private FSDataInputStream in;
+ private DataInputStream in;
private Decoder decoder;
private DatumReader reader;
@@ -57,7 +60,7 @@
* @throws IOException
*/
@SuppressWarnings("deprecation")
- public EventReader(FSDataInputStream in) throws IOException {
+ public EventReader(DataInputStream in) throws IOException {
this.in = in;
this.version = in.readLine();
@@ -65,10 +68,8 @@
throw new IOException("Incompatible event log version: "+version);
this.schema = Schema.parse(in.readLine());
- this.reader =
- new SpecificDatumReader(schema,
- "org.apache.hadoop.mapreduce.jobhistory.Events$");
- this.decoder = new BinaryDecoder(in);
+ this.reader = new SpecificDatumReader(schema);
+ this.decoder = new JsonDecoder(schema, in);
}
/**
@@ -78,10 +79,10 @@
*/
@SuppressWarnings("unchecked")
public HistoryEvent getNextEvent() throws IOException {
- Events.Event wrapper;
+ Event wrapper;
try {
- wrapper = (Events.Event)reader.read(null, decoder);
- } catch (EOFException e) {
+ wrapper = (Event)reader.read(null, decoder);
+ } catch (AvroRuntimeException e) { // at EOF
return null;
}
HistoryEvent result;
@@ -153,6 +154,7 @@
* Close the Event reader
* @throws IOException
*/
+ @Override
public void close() throws IOException {
if (in != null) {
in.close();
@@ -160,12 +162,12 @@
in = null;
}
- static Counters fromAvro(Events.Counters counters) {
+ static Counters fromAvro(JhCounters counters) {
Counters result = new Counters();
- for (Events.CounterGroup g : counters.groups) {
+ for (JhCounterGroup g : counters.groups) {
CounterGroup group =
new CounterGroup(g.name.toString(), g.displayName.toString());
- for (Events.Counter c : g.counts) {
+ for (JhCounter c : g.counts) {
group.addCounter(new Counter(c.name.toString(),
c.displayName.toString(),
c.value));
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java Tue Oct 27 15:43:58 2009
@@ -28,7 +28,7 @@
import org.apache.avro.Schema;
import org.apache.avro.io.Encoder;
-import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.JsonEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.generic.GenericData;
@@ -41,27 +41,28 @@
*
*/
class EventWriter {
- static final String VERSION = "Avro-Binary";
+ static final String VERSION = "Avro-Json";
private FSDataOutputStream out;
- private DatumWriter<Object> writer =
- new SpecificDatumWriter(Events.Event._SCHEMA);
+ private DatumWriter<Object> writer = new SpecificDatumWriter(Event.class);
private Encoder encoder;
EventWriter(FSDataOutputStream out) throws IOException {
this.out = out;
out.writeBytes(VERSION);
out.writeBytes("\n");
- out.writeBytes(Events.Event._SCHEMA.toString());
+ out.writeBytes(Event._SCHEMA.toString());
out.writeBytes("\n");
- this.encoder = new BinaryEncoder(out);
+ this.encoder = new JsonEncoder(Event._SCHEMA, out);
}
synchronized void write(HistoryEvent event) throws IOException {
- Events.Event wrapper = new Events.Event();
+ Event wrapper = new Event();
wrapper.type = event.getEventType();
wrapper.event = event.getDatum();
writer.write(wrapper, encoder);
+ encoder.flush();
+ out.writeBytes("\n");
}
void flush() throws IOException {
@@ -74,26 +75,26 @@
}
private static final Schema GROUPS =
- Schema.createArray(Events.CounterGroup._SCHEMA);
+ Schema.createArray(JhCounterGroup._SCHEMA);
private static final Schema COUNTERS =
- Schema.createArray(Events.Counter._SCHEMA);
+ Schema.createArray(JhCounter._SCHEMA);
- static Events.Counters toAvro(Counters counters) {
+ static JhCounters toAvro(Counters counters) {
return toAvro(counters, "COUNTERS");
}
- static Events.Counters toAvro(Counters counters, String name) {
- Events.Counters result = new Events.Counters();
+ static JhCounters toAvro(Counters counters, String name) {
+ JhCounters result = new JhCounters();
result.name = new Utf8(name);
- result.groups = new GenericData.Array<Events.CounterGroup>(0, GROUPS);
+ result.groups = new GenericData.Array<JhCounterGroup>(0, GROUPS);
if (counters == null) return result;
for (CounterGroup group : counters) {
- Events.CounterGroup g = new Events.CounterGroup();
+ JhCounterGroup g = new JhCounterGroup();
g.name = new Utf8(group.getName());
g.displayName = new Utf8(group.getDisplayName());
- g.counts = new GenericData.Array<Events.Counter>(group.size(), COUNTERS);
+ g.counts = new GenericData.Array<JhCounter>(group.size(), COUNTERS);
for (Counter counter : group) {
- Events.Counter c = new Events.Counter();
+ JhCounter c = new JhCounter();
c.name = new Utf8(counter.getName());
c.displayName = new Utf8(counter.getDisplayName());
c.value = counter.getValue();
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr Tue Oct 27 15:43:58 2009
@@ -21,7 +21,7 @@
"types": [
- {"type": "record", "name": "Counter",
+ {"type": "record", "name": "JhCounter",
"fields": [
{"name": "name", "type": "string"},
{"name": "displayName", "type": "string"},
@@ -29,18 +29,18 @@
]
},
- {"type": "record", "name": "CounterGroup",
+ {"type": "record", "name": "JhCounterGroup",
"fields": [
{"name": "name", "type": "string"},
{"name": "displayName", "type": "string"},
- {"name": "counts", "type": {"type": "array", "items": "Counter"}}
+ {"name": "counts", "type": {"type": "array", "items": "JhCounter"}}
]
},
- {"type": "record", "name": "Counters",
+ {"type": "record", "name": "JhCounters",
"fields": [
{"name": "name", "type": "string"},
- {"name": "groups", "type": {"type": "array", "items": "CounterGroup"}}
+ {"name": "groups", "type": {"type": "array", "items": "JhCounterGroup"}}
]
},
@@ -52,9 +52,9 @@
{"name": "finishedReduces", "type": "int"},
{"name": "failedMaps", "type": "int"},
{"name": "failedReduces", "type": "int"},
- {"name": "totalCounters", "type": "Counters"},
- {"name": "mapCounters", "type": "Counters"},
- {"name": "reduceCounters", "type": "Counters"}
+ {"name": "totalCounters", "type": "JhCounters"},
+ {"name": "mapCounters", "type": "JhCounters"},
+ {"name": "reduceCounters", "type": "JhCounters"}
]
},
@@ -120,7 +120,7 @@
{"name": "finishTime", "type": "long"},
{"name": "hostname", "type": "string"},
{"name": "state", "type": "string"},
- {"name": "counters", "type": "Counters"}
+ {"name": "counters", "type": "JhCounters"}
]
},
@@ -135,7 +135,7 @@
{"name": "finishTime", "type": "long"},
{"name": "hostname", "type": "string"},
{"name": "state", "type": "string"},
- {"name": "counters", "type": "Counters"}
+ {"name": "counters", "type": "JhCounters"}
]
},
@@ -148,7 +148,7 @@
{"name": "finishTime", "type": "long"},
{"name": "hostname", "type": "string"},
{"name": "state", "type": "string"},
- {"name": "counters", "type": "Counters"}
+ {"name": "counters", "type": "JhCounters"}
]
},
@@ -192,7 +192,7 @@
{"name": "taskType", "type": "string"},
{"name": "finishTime", "type": "long"},
{"name": "status", "type": "string"},
- {"name": "counters", "type": "Counters"}
+ {"name": "counters", "type": "JhCounters"}
]
},
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java Tue Oct 27 15:43:58 2009
@@ -27,7 +27,7 @@
public interface HistoryEvent {
/** Return this event's type. */
- Events.EventType getEventType();
+ EventType getEventType();
/** Return the Avro datum wrapped by this. */
Object getDatum();
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java Tue Oct 27 15:43:58 2009
@@ -30,7 +30,7 @@
*
*/
public class JobFinishedEvent implements HistoryEvent {
- private Events.JobFinished datum = new Events.JobFinished();
+ private JobFinished datum = new JobFinished();
/**
* Create an event to record successful job completion
@@ -66,9 +66,9 @@
JobFinishedEvent() {}
public Object getDatum() { return datum; }
- public void setDatum(Object datum) { this.datum = (Events.JobFinished)datum; }
- public Events.EventType getEventType() {
- return Events.EventType.JOB_FINISHED;
+ public void setDatum(Object datum) { this.datum = (JobFinished)datum; }
+ public EventType getEventType() {
+ return EventType.JOB_FINISHED;
}
/** Get the Job ID */
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java Tue Oct 27 15:43:58 2009
@@ -29,16 +29,15 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobTracker;
@@ -312,7 +311,7 @@
private void moveOldFiles() throws IOException {
//move the log files remaining from last run to the DONE folder
- //suffix the file name based on Jobtracker identifier so that history
+ //suffix the file name based on Job tracker identifier so that history
//files with same job id don't get over written in case of recovery.
FileStatus[] files = logDirFs.listStatus(logDir);
String jtIdentifier = jobTracker.getTrackerIdentifier();
@@ -324,7 +323,25 @@
}
LOG.info("Moving log file from last run: " + fromPath);
Path toPath = new Path(done, fromPath.getName() + fileSuffix);
- moveToDoneNow(fromPath, toPath);
+ try {
+ moveToDoneNow(fromPath, toPath);
+ } catch (ChecksumException e) {
+ // If there is an exception moving the file to done because of
+ // a checksum exception, just delete it
+ LOG.warn("Unable to move " + fromPath +", deleting it");
+ try {
+ boolean b = logDirFs.delete(fromPath, false);
+ LOG.debug("Deletion of corrupt file " + fromPath + " returned " + b);
+ } catch (IOException ioe) {
+ // Cannot delete either? Just log and carry on
+ LOG.warn("Unable to delete " + fromPath + "Exception: " +
+ ioe.getMessage());
+ }
+ } catch (IOException e) {
+ // Exceptions other than checksum, just log and continue
+ LOG.warn("Error moving file " + fromPath + " to done folder." +
+ "Ignoring.");
+ }
}
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Tue Oct 27 15:43:58 2009
@@ -105,7 +105,7 @@
}
private void handleEvent(HistoryEvent event) throws IOException {
- Events.EventType type = event.getEventType();
+ EventType type = event.getEventType();
switch (type) {
case JOB_SUBMITTED:
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java Tue Oct 27 15:43:58 2009
@@ -29,7 +29,7 @@
* a job
*/
public class JobInfoChangeEvent implements HistoryEvent {
- private Events.JobInfoChange datum = new Events.JobInfoChange();
+ private JobInfoChange datum = new JobInfoChange();
/**
* Create a event to record the submit and launch time of a job
@@ -47,7 +47,7 @@
public Object getDatum() { return datum; }
public void setDatum(Object datum) {
- this.datum = (Events.JobInfoChange)datum;
+ this.datum = (JobInfoChange)datum;
}
/** Get the Job ID */
@@ -57,8 +57,8 @@
/** Get the Job launch time */
public long getLaunchTime() { return datum.launchTime; }
- public Events.EventType getEventType() {
- return Events.EventType.JOB_INFO_CHANGED;
+ public EventType getEventType() {
+ return EventType.JOB_INFO_CHANGED;
}
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java Tue Oct 27 15:43:58 2009
@@ -29,7 +29,7 @@
*
*/
public class JobInitedEvent implements HistoryEvent {
- private Events.JobInited datum = new Events.JobInited();
+ private JobInited datum = new JobInited();
/**
* Create an event to record job initialization
@@ -51,7 +51,7 @@
JobInitedEvent() { }
public Object getDatum() { return datum; }
- public void setDatum(Object datum) { this.datum = (Events.JobInited)datum; }
+ public void setDatum(Object datum) { this.datum = (JobInited)datum; }
/** Get the job ID */
public JobID getJobId() { return JobID.forName(datum.jobid.toString()); }
@@ -64,8 +64,8 @@
/** Get the status */
public String getStatus() { return datum.jobStatus.toString(); }
/** Get the event type */
- public Events.EventType getEventType() {
- return Events.EventType.JOB_INITED;
+ public EventType getEventType() {
+ return EventType.JOB_INITED;
}
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java Tue Oct 27 15:43:58 2009
@@ -30,7 +30,7 @@
*
*/
public class JobPriorityChangeEvent implements HistoryEvent {
- private Events.JobPriorityChange datum = new Events.JobPriorityChange();
+ private JobPriorityChange datum = new JobPriorityChange();
/** Generate an event to record changes in Job priority
* @param id Job Id
@@ -45,7 +45,7 @@
public Object getDatum() { return datum; }
public void setDatum(Object datum) {
- this.datum = (Events.JobPriorityChange)datum;
+ this.datum = (JobPriorityChange)datum;
}
/** Get the Job ID */
@@ -55,8 +55,8 @@
return JobPriority.valueOf(datum.priority.toString());
}
/** Get the event type */
- public Events.EventType getEventType() {
- return Events.EventType.JOB_PRIORITY_CHANGED;
+ public EventType getEventType() {
+ return EventType.JOB_PRIORITY_CHANGED;
}
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java Tue Oct 27 15:43:58 2009
@@ -29,7 +29,7 @@
*
*/
public class JobStatusChangedEvent implements HistoryEvent {
- private Events.JobStatusChanged datum = new Events.JobStatusChanged();
+ private JobStatusChanged datum = new JobStatusChanged();
/**
* Create an event to record the change in the Job Status
@@ -45,7 +45,7 @@
public Object getDatum() { return datum; }
public void setDatum(Object datum) {
- this.datum = (Events.JobStatusChanged)datum;
+ this.datum = (JobStatusChanged)datum;
}
/** Get the Job Id */
@@ -53,8 +53,8 @@
/** Get the event status */
public String getStatus() { return datum.jobStatus.toString(); }
/** Get the event type */
- public Events.EventType getEventType() {
- return Events.EventType.JOB_STATUS_CHANGED;
+ public EventType getEventType() {
+ return EventType.JOB_STATUS_CHANGED;
}
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java Tue Oct 27 15:43:58 2009
@@ -29,7 +29,7 @@
*
*/
public class JobSubmittedEvent implements HistoryEvent {
- private Events.JobSubmitted datum = new Events.JobSubmitted();
+ private JobSubmitted datum = new JobSubmitted();
/**
* Create an event to record job submission
@@ -52,7 +52,7 @@
public Object getDatum() { return datum; }
public void setDatum(Object datum) {
- this.datum = (Events.JobSubmitted)datum;
+ this.datum = (JobSubmitted)datum;
}
/** Get the Job Id */
@@ -66,6 +66,6 @@
/** Get the Path for the Job Configuration file */
public String getJobConfPath() { return datum.jobConfPath.toString(); }
/** Get the event type */
- public Events.EventType getEventType() { return Events.EventType.JOB_SUBMITTED; }
+ public EventType getEventType() { return EventType.JOB_SUBMITTED; }
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java Tue Oct 27 15:43:58 2009
@@ -29,8 +29,8 @@
*
*/
public class JobUnsuccessfulCompletionEvent implements HistoryEvent {
- private Events.JobUnsuccessfulCompletion datum
- = new Events.JobUnsuccessfulCompletion();
+ private JobUnsuccessfulCompletion datum
+ = new JobUnsuccessfulCompletion();
/**
* Create an event to record unsuccessful completion (killed/failed) of jobs
@@ -54,7 +54,7 @@
public Object getDatum() { return datum; }
public void setDatum(Object datum) {
- this.datum = (Events.JobUnsuccessfulCompletion)datum;
+ this.datum = (JobUnsuccessfulCompletion)datum;
}
/** Get the Job ID */
@@ -68,11 +68,11 @@
/** Get the status */
public String getStatus() { return datum.jobStatus.toString(); }
/** Get the event type */
- public Events.EventType getEventType() {
+ public EventType getEventType() {
if ("FAILED".equals(getStatus())) {
- return Events.EventType.JOB_FAILED;
+ return EventType.JOB_FAILED;
} else
- return Events.EventType.JOB_KILLED;
+ return EventType.JOB_KILLED;
}
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java Tue Oct 27 15:43:58 2009
@@ -32,7 +32,7 @@
*
*/
public class MapAttemptFinishedEvent implements HistoryEvent {
- private Events.MapAttemptFinished datum = new Events.MapAttemptFinished();
+ private MapAttemptFinished datum = new MapAttemptFinished();
/**
* Create an event for successful completion of map attempts
@@ -64,7 +64,7 @@
public Object getDatum() { return datum; }
public void setDatum(Object datum) {
- this.datum = (Events.MapAttemptFinished)datum;
+ this.datum = (MapAttemptFinished)datum;
}
/** Get the task ID */
@@ -90,8 +90,8 @@
/** Get the counters */
Counters getCounters() { return EventReader.fromAvro(datum.counters); }
/** Get the event type */
- public Events.EventType getEventType() {
- return Events.EventType.MAP_ATTEMPT_FINISHED;
+ public EventType getEventType() {
+ return EventType.MAP_ATTEMPT_FINISHED;
}
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java Tue Oct 27 15:43:58 2009
@@ -32,8 +32,8 @@
*
*/
public class ReduceAttemptFinishedEvent implements HistoryEvent {
- private Events.ReduceAttemptFinished datum =
- new Events.ReduceAttemptFinished();
+ private ReduceAttemptFinished datum =
+ new ReduceAttemptFinished();
/**
* Create an event to record completion of a reduce attempt
@@ -68,7 +68,7 @@
public Object getDatum() { return datum; }
public void setDatum(Object datum) {
- this.datum = (Events.ReduceAttemptFinished)datum;
+ this.datum = (ReduceAttemptFinished)datum;
}
/** Get the Task ID */
@@ -96,8 +96,8 @@
/** Get the counters for the attempt */
Counters getCounters() { return EventReader.fromAvro(datum.counters); }
/** Get the event type */
- public Events.EventType getEventType() {
- return Events.EventType.REDUCE_ATTEMPT_FINISHED;
+ public EventType getEventType() {
+ return EventType.REDUCE_ATTEMPT_FINISHED;
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java Tue Oct 27 15:43:58 2009
@@ -32,7 +32,7 @@
*
*/
public class TaskAttemptFinishedEvent implements HistoryEvent {
- private Events.TaskAttemptFinished datum = new Events.TaskAttemptFinished();
+ private TaskAttemptFinished datum = new TaskAttemptFinished();
/**
* Create an event to record successful finishes for setup and cleanup
@@ -63,7 +63,7 @@
public Object getDatum() { return datum; }
public void setDatum(Object datum) {
- this.datum = (Events.TaskAttemptFinished)datum;
+ this.datum = (TaskAttemptFinished)datum;
}
/** Get the task ID */
@@ -87,8 +87,8 @@
/** Get the counters for the attempt */
Counters getCounters() { return EventReader.fromAvro(datum.counters); }
/** Get the event type */
- public Events.EventType getEventType() {
- return Events.EventType.MAP_ATTEMPT_FINISHED;
+ public EventType getEventType() {
+ return EventType.MAP_ATTEMPT_FINISHED;
}
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java Tue Oct 27 15:43:58 2009
@@ -31,7 +31,7 @@
*
*/
public class TaskAttemptStartedEvent implements HistoryEvent {
- private Events.TaskAttemptStarted datum = new Events.TaskAttemptStarted();
+ private TaskAttemptStarted datum = new TaskAttemptStarted();
/**
* Create an event to record the start of an attempt
@@ -56,7 +56,7 @@
public Object getDatum() { return datum; }
public void setDatum(Object datum) {
- this.datum = (Events.TaskAttemptStarted)datum;
+ this.datum = (TaskAttemptStarted)datum;
}
/** Get the task id */
@@ -76,8 +76,8 @@
return TaskAttemptID.forName(datum.attemptId.toString());
}
/** Get the event type */
- public Events.EventType getEventType() {
- return Events.EventType.MAP_ATTEMPT_STARTED;
+ public EventType getEventType() {
+ return EventType.MAP_ATTEMPT_STARTED;
}
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java Tue Oct 27 15:43:58 2009
@@ -31,8 +31,8 @@
*
*/
public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
- private Events.TaskAttemptUnsuccessfulCompletion datum =
- new Events.TaskAttemptUnsuccessfulCompletion();
+ private TaskAttemptUnsuccessfulCompletion datum =
+ new TaskAttemptUnsuccessfulCompletion();
/**
* Create an event to record the unsuccessful completion of attempts
@@ -60,7 +60,7 @@
public Object getDatum() { return datum; }
public void setDatum(Object datum) {
- this.datum = (Events.TaskAttemptUnsuccessfulCompletion)datum;
+ this.datum = (TaskAttemptUnsuccessfulCompletion)datum;
}
/** Get the task id */
@@ -82,8 +82,8 @@
/** Get the task status */
public String getTaskStatus() { return datum.status.toString(); }
/** Get the event type */
- public Events.EventType getEventType() {
- return Events.EventType.MAP_ATTEMPT_KILLED;
+ public EventType getEventType() {
+ return EventType.MAP_ATTEMPT_KILLED;
}
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java Tue Oct 27 15:43:58 2009
@@ -31,7 +31,7 @@
*
*/
public class TaskFailedEvent implements HistoryEvent {
- private Events.TaskFailed datum = new Events.TaskFailed();
+ private TaskFailed datum = new TaskFailed();
/**
* Create an event to record task failure
@@ -58,7 +58,7 @@
TaskFailedEvent() {}
public Object getDatum() { return datum; }
- public void setDatum(Object datum) { this.datum = (Events.TaskFailed)datum; }
+ public void setDatum(Object datum) { this.datum = (TaskFailed)datum; }
/** Get the task id */
public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
@@ -79,7 +79,7 @@
/** Get the task status */
public String getTaskStatus() { return datum.status.toString(); }
/** Get the event type */
- public Events.EventType getEventType() { return Events.EventType.TASK_FAILED; }
+ public EventType getEventType() { return EventType.TASK_FAILED; }
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java Tue Oct 27 15:43:58 2009
@@ -31,7 +31,7 @@
*
*/
public class TaskFinishedEvent implements HistoryEvent {
- private Events.TaskFinished datum = new Events.TaskFinished();
+ private TaskFinished datum = new TaskFinished();
/**
* Create an event to record the successful completion of a task
@@ -55,7 +55,7 @@
public Object getDatum() { return datum; }
public void setDatum(Object datum) {
- this.datum = (Events.TaskFinished)datum;
+ this.datum = (TaskFinished)datum;
}
/** Get task id */
@@ -71,8 +71,8 @@
/** Get task status */
public String getTaskStatus() { return datum.status.toString(); }
/** Get event type */
- public Events.EventType getEventType() {
- return Events.EventType.TASK_FINISHED;
+ public EventType getEventType() {
+ return EventType.TASK_FINISHED;
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java Tue Oct 27 15:43:58 2009
@@ -30,7 +30,7 @@
*
*/
public class TaskStartedEvent implements HistoryEvent {
- private Events.TaskStarted datum = new Events.TaskStarted();
+ private TaskStarted datum = new TaskStarted();
/**
* Create an event to record start of a task
@@ -50,7 +50,7 @@
TaskStartedEvent() {}
public Object getDatum() { return datum; }
- public void setDatum(Object datum) { this.datum = (Events.TaskStarted)datum; }
+ public void setDatum(Object datum) { this.datum = (TaskStarted)datum; }
/** Get the task id */
public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
@@ -63,8 +63,8 @@
return TaskType.valueOf(datum.taskType.toString());
}
/** Get the event type */
- public Events.EventType getEventType() {
- return Events.EventType.TASK_STARTED;
+ public EventType getEventType() {
+ return EventType.TASK_STARTED;
}
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java Tue Oct 27 15:43:58 2009
@@ -29,7 +29,7 @@
*
*/
public class TaskUpdatedEvent implements HistoryEvent {
- private Events.TaskUpdated datum = new Events.TaskUpdated();
+ private TaskUpdated datum = new TaskUpdated();
/**
* Create an event to record task updates
@@ -44,15 +44,15 @@
TaskUpdatedEvent() {}
public Object getDatum() { return datum; }
- public void setDatum(Object datum) { this.datum = (Events.TaskUpdated)datum; }
+ public void setDatum(Object datum) { this.datum = (TaskUpdated)datum; }
/** Get the task ID */
public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
/** Get the task finish time */
public long getFinishTime() { return datum.finishTime; }
/** Get the event type */
- public Events.EventType getEventType() {
- return Events.EventType.TASK_UPDATED;
+ public EventType getEventType() {
+ return EventType.TASK_UPDATED;
}
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java Tue Oct 27 15:43:58 2009
@@ -70,7 +70,7 @@
private Connection connection;
- private PreparedStatement statement;
+ protected PreparedStatement statement;
private DBConfiguration dbConf;
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDBRecordReader.java Tue Oct 27 15:43:58 2009
@@ -38,10 +38,9 @@
// Execute statements for mysql in unbuffered mode.
protected ResultSet executeQuery(String query) throws SQLException {
- PreparedStatement statement = getConnection().prepareStatement(query,
+ statement = getConnection().prepareStatement(query,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
statement.setFetchSize(Integer.MIN_VALUE); // MySQL: read row-at-a-time.
- setStatement(statement); // save a ref for cleanup in close()
return statement.executeQuery();
}
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/db/MySQLDataDrivenDBRecordReader.java Tue Oct 27 15:43:58 2009
@@ -39,10 +39,9 @@
// Execute statements for mysql in unbuffered mode.
protected ResultSet executeQuery(String query) throws SQLException {
- PreparedStatement statement = getConnection().prepareStatement(query,
+ statement = getConnection().prepareStatement(query,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
statement.setFetchSize(Integer.MIN_VALUE); // MySQL: read row-at-a-time.
- setStatement(statement); // save a ref so the close() method cleans this up.
return statement.executeQuery();
}
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java Tue Oct 27 15:43:58 2009
@@ -23,10 +23,12 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -43,6 +45,9 @@
* Temporary directory name
*/
protected static final String TEMP_DIR_NAME = "_temporary";
+ public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
+ static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
+ "mapreduce.fileoutputcommitter.marksuccessfuljobs";
private FileSystem outputFileSystem = null;
private Path outputPath = null;
private Path workPath = null;
@@ -80,21 +85,59 @@
}
}
+ // True if the job requires output.dir marked on successful job.
+ // Note that by default it is set to true.
+ private boolean shouldMarkOutputDir(Configuration conf) {
+ return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true);
+ }
+
+ // Create a _success file in the job's output dir
+ private void markOutputDirSuccessful(JobContext context) throws IOException {
+ if (outputPath != null) {
+ // create a file in the output folder to mark the job completion
+ Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
+ outputFileSystem.create(filePath).close();
+ }
+ }
+
/**
* Delete the temporary directory, including all of the work directories.
+ * Create a _SUCCESS file to make it as successful.
* @param context the job's context
*/
- public void cleanupJob(JobContext context) throws IOException {
+ public void commitJob(JobContext context) throws IOException {
+ // delete the _temporary folder and create a _done file in the o/p folder
+ cleanup(context);
+ if (shouldMarkOutputDir(context.getConfiguration())) {
+ markOutputDirSuccessful(context);
+ }
+ }
+
+ // Delete the _temporary folder in the output dir.
+ private void cleanup(JobContext context) throws IOException {
if (outputPath != null) {
Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
if (fileSys.exists(tmpDir)) {
fileSys.delete(tmpDir, true);
}
+ } else {
+ LOG.warn("Output Path is null in cleanup");
}
}
/**
+ * Delete the temporary directory, including all of the work directories.
+ * @param context the job's context
+ */
+ @Override
+ public void abortJob(JobContext context, JobStatus.State state)
+ throws IOException {
+ // delete the _temporary folder
+ cleanup(context);
+ }
+
+ /**
* No task setup required.
*/
@Override
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java Tue Oct 27 15:43:58 2009
@@ -85,8 +85,10 @@
* Version 27: Changed protocol to use new api objects. And the protocol is
* renamed from JobSubmissionProtocol to ClientProtocol.
* Version 28: Added getJobHistoryDir() as part of MAPREDUCE-975.
+ * Version 29: Added reservedSlots, runningTasks and totalJobSubmissions
+ * to ClusterMetrics as part of MAPREDUCE-1048.
*/
- public static final long versionID = 28L;
+ public static final long versionID = 29L;
/**
* Allocate a name for the job.
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java Tue Oct 27 15:43:58 2009
@@ -41,6 +41,8 @@
"mapreduce.jobtracker.maxtasks.perjob";
public static final String JT_HEARTBEATS_IN_SECOND =
"mapreduce.jobtracker.heartbeats.in.second";
+ public static final String JT_HEARTBEATS_SCALING_FACTOR =
+ "mapreduce.jobtracker.heartbeats.scaling.factor";
public static final String JT_PERSIST_JOBSTATUS =
"mapreduce.jobtracker.persist.jobstatus.active";
public static final String JT_PERSIST_JOBSTATUS_HOURS =
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java Tue Oct 27 15:43:58 2009
@@ -75,5 +75,6 @@
"mapreduce.tasktracker.taskmemorymanager.monitoringinterval";
public static final String TT_LOCAL_CACHE_SIZE =
"mapreduce.tasktracker.cache.local.size";
-
+ public static final String TT_OUTOFBAND_HEARBEAT =
+ "mapreduce.tasktracker.outofband.heartbeat";
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/util/ProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/util/ProcessTree.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/util/ProcessTree.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapreduce/util/ProcessTree.java Tue Oct 27 15:43:58 2009
@@ -50,8 +50,8 @@
setsidSupported = false;
} finally { // handle the exit code
LOG.info("setsid exited with exit code " + shexec.getExitCode());
- return setsidSupported;
}
+ return setsidSupported;
}
/**
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java Tue Oct 27 15:43:58 2009
@@ -25,6 +25,7 @@
* {@link org.apache.hadoop.mapreduce.util.LinuxMemoryCalculatorPlugin}
* instead
*/
+@Deprecated
public class LinuxMemoryCalculatorPlugin extends
org.apache.hadoop.mapreduce.util.LinuxMemoryCalculatorPlugin {
// Inherits everything from the super class
Modified: hadoop/mapreduce/branches/HDFS-641/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/findbugsExcludeFile.xml?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/findbugsExcludeFile.xml Tue Oct 27 15:43:58 2009
@@ -16,6 +16,10 @@
<Bug pattern="DLS_DEAD_LOCAL_STORE" />
</Match>
<Match>
+ <Class name="~.*_jspx" />
+ <Bug pattern="DLS_DEAD_LOCAL_STORE" />
+ </Match>
+ <Match>
<Field name="_jspx_dependants" />
<Bug pattern="UWF_UNWRITTEN_FIELD" />
</Match>
@@ -64,6 +68,14 @@
<Class name="org.apache.hadoop.mapred.SequenceFileInputFilter$Filter" />
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
</Match>
+ <Match>
+ <Class name="~org.apache.hadoop.util.*" />
+ <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS" />
+ </Match>
+ <Match>
+ <Class name="~org.apache.hadoop.filecache.*" />
+ <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS" />
+ </Match>
<!--
Ignore warnings for usage of System.exit. This is
required and have been well thought out
@@ -116,6 +128,20 @@
<Class name="org.apache.hadoop.mapred.FileOutputCommitter" />
<Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
</Match>
+ <Match>
+ <Class name="org.apache.hadoop.mapred.OutputCommitter" />
+ <Or>
+ <Method name="abortJob" />
+ <Method name="commitJob" />
+ <Method name="cleanupJob" />
+ </Or>
+ <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
+ </Match>
+ <Match>
+ <Class name="org.apache.hadoop.mapred.lib.db.DBInputFormat$DBRecordReader" />
+ <Method name="next" />
+ <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
+ </Match>
<!--
Ignoring this warning as resolving this would need a non-trivial change in code
-->
@@ -163,6 +189,11 @@
<Field name="inputs" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
+ <Match>
+ <Class name="org.apache.hadoop.mapred.JobTracker" />
+ <Method name="updateTaskTrackerStatus" />
+ <Bug pattern="DLS_DEAD_LOCAL_STORE" />
+ </Match>
<!--
This class is unlikely to get subclassed, so ignore
Propchange: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 27 15:43:58 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/test/mapred:713112
/hadoop/core/trunk/src/test/mapred:776175-785643
-/hadoop/mapreduce/trunk/src/test/mapred:817879-818559
+/hadoop/mapreduce/trunk/src/test/mapred:817878-830225
Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/conf/TestJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/conf/TestJobConf.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/conf/TestJobConf.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/conf/TestJobConf.java Tue Oct 27 15:43:58 2009
@@ -17,14 +17,15 @@
*/
package org.apache.hadoop.conf;
-import junit.framework.Assert;
-import junit.framework.TestCase;
+import org.junit.Assert;
+import org.junit.Test;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobContext;
-public class TestJobConf extends TestCase {
+public class TestJobConf {
+ @Test
public void testProfileParamsDefaults() {
JobConf configuration = new JobConf();
@@ -37,6 +38,7 @@
Assert.assertTrue(result.startsWith("-agentlib:hprof"));
}
+ @Test
public void testProfileParamsSetter() {
JobConf configuration = new JobConf();
@@ -44,6 +46,7 @@
Assert.assertEquals("test", configuration.get(JobContext.TASK_PROFILE_PARAMS));
}
+ @Test
public void testProfileParamsGetter() {
JobConf configuration = new JobConf();
@@ -55,6 +58,7 @@
* Testing mapred.task.maxvmem replacement with new values
*
*/
+ @Test
public void testMemoryConfigForMapOrReduceTask(){
JobConf configuration = new JobConf();
configuration.set(JobContext.MAP_MEMORY_MB,String.valueOf(300));
@@ -71,9 +75,9 @@
configuration = new JobConf();
configuration.set("mapred.task.maxvmem" , "-1");
configuration.set(JobContext.MAP_MEMORY_MB,String.valueOf(300));
- configuration.set(JobContext.REDUCE_MEMORY_MB,String.valueOf(300));
- Assert.assertEquals(configuration.getMemoryForMapTask(),-1);
- Assert.assertEquals(configuration.getMemoryForReduceTask(),-1);
+ configuration.set(JobContext.REDUCE_MEMORY_MB,String.valueOf(400));
+ Assert.assertEquals(configuration.getMemoryForMapTask(), 300);
+ Assert.assertEquals(configuration.getMemoryForReduceTask(), 400);
configuration = new JobConf();
configuration.set("mapred.task.maxvmem" , String.valueOf(2*1024 * 1024));
@@ -91,13 +95,53 @@
configuration = new JobConf();
configuration.set("mapred.task.maxvmem" , String.valueOf(2*1024 * 1024));
+ configuration.set(JobContext.MAP_MEMORY_MB, "3");
+ configuration.set(JobContext.REDUCE_MEMORY_MB, "3");
Assert.assertEquals(configuration.getMemoryForMapTask(),2);
Assert.assertEquals(configuration.getMemoryForReduceTask(),2);
+
}
/**
+ * Test that negative values for MAPRED_TASK_MAXVMEM_PROPERTY cause
+ * new configuration keys' values to be used.
+ */
+ @Test
+ public void testNegativeValueForTaskVmem() {
+ JobConf configuration = new JobConf();
+
+ configuration.set(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, "-3");
+ configuration.set(JobContext.MAP_MEMORY_MB, "4");
+ configuration.set(JobContext.REDUCE_MEMORY_MB, "5");
+ Assert.assertEquals(4, configuration.getMemoryForMapTask());
+ Assert.assertEquals(5, configuration.getMemoryForReduceTask());
+
+ }
+
+ /**
+ * Test that negative values for all memory configuration properties causes
+ * APIs to disable memory limits
+ */
+ @Test
+ public void testNegativeValuesForMemoryParams() {
+ JobConf configuration = new JobConf();
+
+ configuration.set(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, "-4");
+ configuration.set(JobContext.MAP_MEMORY_MB, "-5");
+ configuration.set(JobContext.REDUCE_MEMORY_MB, "-6");
+
+ Assert.assertEquals(JobConf.DISABLED_MEMORY_LIMIT,
+ configuration.getMemoryForMapTask());
+ Assert.assertEquals(JobConf.DISABLED_MEMORY_LIMIT,
+ configuration.getMemoryForReduceTask());
+ Assert.assertEquals(JobConf.DISABLED_MEMORY_LIMIT,
+ configuration.getMaxVirtualMemoryForTask());
+ }
+
+ /**
* Test deprecated accessor and mutator method for mapred.task.maxvmem
*/
+ @Test
public void testMaxVirtualMemoryForTask() {
JobConf configuration = new JobConf();
@@ -138,5 +182,7 @@
configuration.setMaxVirtualMemoryForTask(2 * 1024 * 1024);
Assert.assertEquals(configuration.getMemoryForMapTask(), 2);
Assert.assertEquals(configuration.getMemoryForReduceTask(), 2);
+
+
}
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java Tue Oct 27 15:43:58 2009
@@ -84,7 +84,7 @@
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(outDir,
- new OutputLogFilter()));
+ new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Tue Oct 27 15:43:58 2009
@@ -257,7 +257,9 @@
*/
protected void assertOwnerShip(Path outDir, FileSystem fs)
throws IOException {
- for (FileStatus status : fs.listStatus(outDir, new OutputLogFilter())) {
+ for (FileStatus status : fs.listStatus(outDir,
+ new Utils.OutputFileUtils
+ .OutputFilesFilter())) {
String owner = status.getOwner();
String group = status.getGroup();
LOG.info("Ownership of the file is " + status.getPath() + " is " + owner
Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java Tue Oct 27 15:43:58 2009
@@ -31,6 +31,7 @@
import org.apache.hadoop.mapred.TaskStatus.Phase;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.Job.RawSplit;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
@@ -188,6 +189,7 @@
String taskTracker) {
addRunningTaskToTIP(tip, taskId, new TaskTrackerStatus(taskTracker,
JobInProgress.convertTrackerNameToHostName(taskTracker)), true);
+
TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskId,
0.0f, 1, TaskStatus.State.RUNNING, "", "", taskTracker,
tip.isMapTask() ? Phase.MAP : Phase.REDUCE, new Counters());
@@ -221,7 +223,7 @@
}
static short sendHeartBeat(JobTracker jt, TaskTrackerStatus status,
- boolean initialContact,
+ boolean initialContact, boolean acceptNewTasks,
String tracker, short responseId)
throws IOException {
if (status == null) {
@@ -229,15 +231,39 @@
JobInProgress.convertTrackerNameToHostName(tracker));
}
- jt.heartbeat(status, false, initialContact, false, responseId);
+ jt.heartbeat(status, false, initialContact, acceptNewTasks, responseId);
return ++responseId ;
}
static void establishFirstContact(JobTracker jt, String tracker)
throws IOException {
- sendHeartBeat(jt, null, true, tracker, (short) 0);
+ sendHeartBeat(jt, null, true, false, tracker, (short) 0);
}
+ static class FakeTaskInProgress extends TaskInProgress {
+
+ public FakeTaskInProgress(JobID jobId, String jobFile, int numMaps,
+ int partition, JobTracker jobTracker, JobConf conf, JobInProgress job,
+ int numSlotsRequired) {
+ super(jobId, jobFile, numMaps, partition, jobTracker, conf, job,
+ numSlotsRequired);
+ }
+
+ public FakeTaskInProgress(JobID jobId, String jobFile, RawSplit emptySplit,
+ JobTracker jobTracker, JobConf jobConf,
+ JobInProgress job, int partition, int numSlotsRequired) {
+ super(jobId, jobFile, emptySplit, jobTracker, jobConf, job,
+ partition, numSlotsRequired);
+ }
+
+ @Override
+ synchronized boolean updateStatus(TaskStatus status) {
+ TaskAttemptID taskid = status.getTaskID();
+ taskStatuses.put(taskid, status);
+ return false;
+ }
+ }
+
static class FakeJobHistory extends JobHistory {
@Override
public void init(JobTracker jt,
Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java Tue Oct 27 15:43:58 2009
@@ -96,7 +96,8 @@
public static class NotificationServlet extends HttpServlet {
public static int counter = 0;
-
+ private static final long serialVersionUID = 1L;
+
protected void doGet(HttpServletRequest req, HttpServletResponse res)
throws ServletException, IOException {
switch (counter) {
Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java Tue Oct 27 15:43:58 2009
@@ -167,7 +167,7 @@
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
- new OutputLogFilter()));
+ new Utils.OutputFileUtils.OutputFilesFilter()));
List<String> mapperOutput=getProcessed(input, mapperBadRecords);
LOG.debug("mapperOutput " + mapperOutput.size());
Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java Tue Oct 27 15:43:58 2009
@@ -64,7 +64,7 @@
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
- new OutputLogFilter()));
+ new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestComparators.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestComparators.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestComparators.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestComparators.java Tue Oct 27 15:43:58 2009
@@ -127,7 +127,7 @@
public void reduce(IntWritable key, Iterator<Writable> values,
OutputCollector<IntWritable, Text> out,
Reporter reporter) throws IOException {
- int currentKey = ((IntWritable)(key)).get();
+ int currentKey = key.get();
// keys should be in descending order
if (currentKey > lastKey) {
fail("Keys not in sorted descending order");