You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gt...@apache.org on 2016/03/10 19:56:07 UTC
hadoop git commit: YARN-4696. Improving EntityGroupFSTimelineStore on
exception handling, test setup,
and concurrency. (Steve Loughran via gtcarrera9)
Repository: hadoop
Updated Branches:
refs/heads/trunk 318c9b68b -> d49cfb350
YARN-4696. Improving EntityGroupFSTimelineStore on exception handling, test setup, and concurrency. (Steve Loughran via gtcarrera9)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d49cfb35
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d49cfb35
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d49cfb35
Branch: refs/heads/trunk
Commit: d49cfb350454c2dfa2f3eb70f79b6d5030ce7bec
Parents: 318c9b6
Author: Li Lu <gt...@apache.org>
Authored: Thu Mar 10 10:51:55 2016 -0800
Committer: Li Lu <gt...@apache.org>
Committed: Thu Mar 10 10:51:55 2016 -0800
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 6 +
.../hadoop/yarn/client/api/TimelineClient.java | 4 +-
.../api/impl/FileSystemTimelineWriter.java | 51 ++---
.../client/api/impl/TimelineClientImpl.java | 13 ++
.../yarn/client/api/impl/TimelineWriter.java | 40 +++-
.../timeline/webapp/TimelineWebServices.java | 12 +-
.../yarn/server/timeline/EntityCacheItem.java | 40 ++--
.../timeline/EntityGroupFSTimelineStore.java | 204 ++++++++++++++-----
.../hadoop/yarn/server/timeline/LogInfo.java | 11 +-
.../TestEntityGroupFSTimelineStore.java | 8 +-
10 files changed, 279 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d49cfb35/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 61d1d72..ff4b493 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1747,6 +1747,12 @@ public class YarnConfiguration extends Configuration {
public static final long
TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS_DEFAULT = 7 * 60;
+ // This is temporary solution. The configuration will be deleted once we have
+ // the FileSystem API to check whether append operation is supported or not.
+ public static final String TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
+ = TIMELINE_SERVICE_PREFIX
+ + "entity-file.fs-support-append";
+
// mark app-history related configs @Private as application history is going
// to be integrated into the timeline service
@Private
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d49cfb35/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
index 258b9f5..09298b5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.client.api;
+import java.io.Flushable;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -41,7 +42,8 @@ import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
*/
@Public
@Evolving
-public abstract class TimelineClient extends AbstractService {
+public abstract class TimelineClient extends AbstractService implements
+ Flushable {
/**
* Create a timeline client. The current UGI when the user initialize the
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d49cfb35/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
index aa1f1f8..9e719b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.client.api.impl;
import java.io.Closeable;
+import java.io.FileNotFoundException;
import java.io.Flushable;
import java.io.IOException;
import java.net.URI;
@@ -78,12 +79,6 @@ public class FileSystemTimelineWriter extends TimelineWriter{
private static final Log LOG = LogFactory
.getLog(FileSystemTimelineWriter.class);
- // This is temporary solution. The configuration will be deleted once we have
- // the FileSystem API to check whether append operation is supported or not.
- private static final String TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
- = YarnConfiguration.TIMELINE_SERVICE_PREFIX
- + "entity-file.fs-support-append";
-
// App log directory must be readable by group so server can access logs
// and writable by group so it can be deleted by server
private static final short APP_LOG_DIR_PERMISSIONS = 0770;
@@ -122,20 +117,10 @@ public class FileSystemTimelineWriter extends TimelineWriter{
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT));
+ fs = FileSystem.newInstance(activePath.toUri(), fsConf);
- String scheme = activePath.toUri().getScheme();
- if (scheme == null) {
- scheme = FileSystem.getDefaultUri(fsConf).getScheme();
- }
- if (scheme != null) {
- String disableCacheName = String.format("fs.%s.impl.disable.cache",
- scheme);
- fsConf.setBoolean(disableCacheName, true);
- }
-
- fs = activePath.getFileSystem(fsConf);
if (!fs.exists(activePath)) {
- throw new IOException(activePath + " does not exist");
+ throw new FileNotFoundException(activePath + " does not exist");
}
summaryEntityTypes = new HashSet<String>(
@@ -168,7 +153,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
timerTaskTTL);
this.isAppendSupported =
- conf.getBoolean(TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true);
+ conf.getBoolean(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true);
objMapper = createObjectMapper();
@@ -181,7 +167,7 @@ public class FileSystemTimelineWriter extends TimelineWriter{
+ "=" + cleanIntervalSecs + ", " +
YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS
+ "=" + ttl + ", " +
- TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
+ "=" + isAppendSupported + ", " +
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR
+ "=" + activePath);
@@ -196,6 +182,11 @@ public class FileSystemTimelineWriter extends TimelineWriter{
}
@Override
+ public String toString() {
+ return "FileSystemTimelineWriter writing to " + activePath;
+ }
+
+ @Override
public TimelinePutResponse putEntities(
ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId,
TimelineEntity... entities) throws IOException, YarnException {
@@ -263,9 +254,20 @@ public class FileSystemTimelineWriter extends TimelineWriter{
}
@Override
- public void close() throws Exception {
- if (this.logFDsCache != null) {
- this.logFDsCache.close();
+ public synchronized void close() throws Exception {
+ if (logFDsCache != null) {
+ LOG.debug("Closing cache");
+ logFDsCache.flush();
+ logFDsCache.close();
+ logFDsCache = null;
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (logFDsCache != null) {
+ LOG.debug("Flushing cache");
+ logFDsCache.flush();
}
}
@@ -333,6 +335,9 @@ public class FileSystemTimelineWriter extends TimelineWriter{
if (writerClosed()) {
prepareForWrite();
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing entity list of size " + entities.size());
+ }
for (TimelineEntity entity : entities) {
getObjectMapper().writeValue(getJsonGenerator(), entity);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d49cfb35/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index 195a661..ef46229 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -326,6 +326,13 @@ public class TimelineClientImpl extends TimelineClient {
}
@Override
+ public void flush() throws IOException {
+ if (timelineWriter != null) {
+ timelineWriter.flush();
+ }
+ }
+
+ @Override
public TimelinePutResponse putEntities(
TimelineEntity... entities) throws IOException, YarnException {
return timelineWriter.putEntities(entities);
@@ -432,6 +439,12 @@ public class TimelineClientImpl extends TimelineClient {
operateDelegationToken(cancelDTAction);
}
+ @Override
+ public String toString() {
+ return super.toString() + " with timeline server " + resURI
+ + " and writer " + timelineWriter;
+ }
+
private Object operateDelegationToken(
final PrivilegedExceptionAction<?> action)
throws IOException, YarnException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d49cfb35/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java
index c616e63..9590f4a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.yarn.client.api.impl;
+import java.io.Flushable;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
@@ -48,7 +50,7 @@ import com.sun.jersey.api.client.WebResource;
*/
@Private
@Unstable
-public abstract class TimelineWriter {
+public abstract class TimelineWriter implements Flushable {
private static final Log LOG = LogFactory
.getLog(TimelineWriter.class);
@@ -68,6 +70,16 @@ public abstract class TimelineWriter {
// DO NOTHING
}
+ @Override
+ public void flush() throws IOException {
+ // DO NOTHING
+ }
+
+ @Override
+ public String toString() {
+ return "Timeline writer posting to " + resURI;
+ }
+
public TimelinePutResponse putEntities(
TimelineEntity... entities) throws IOException, YarnException {
TimelineEntities entitiesContainer = new TimelineEntities();
@@ -104,19 +116,27 @@ public abstract class TimelineWriter {
}
});
} catch (UndeclaredThrowableException e) {
- throw new IOException(e.getCause());
+ Throwable cause = e.getCause();
+ if (cause instanceof IOException) {
+ throw (IOException)cause;
+ } else {
+ throw new IOException(cause);
+ }
} catch (InterruptedException ie) {
- throw new IOException(ie);
+ throw (IOException)new InterruptedIOException().initCause(ie);
}
if (resp == null ||
resp.getClientResponseStatus() != ClientResponse.Status.OK) {
String msg =
"Failed to get the response from the timeline server.";
LOG.error(msg);
- if (LOG.isDebugEnabled() && resp != null) {
- String output = resp.getEntity(String.class);
- LOG.debug("HTTP error code: " + resp.getStatus()
- + " Server response : \n" + output);
+ if (resp != null) {
+ msg += " HTTP error code: " + resp.getStatus();
+ if (LOG.isDebugEnabled()) {
+ String output = resp.getEntity(String.class);
+ LOG.debug("HTTP error code: " + resp.getStatus()
+ + " Server response : \n" + output);
+ }
}
throw new YarnException(msg);
}
@@ -128,10 +148,16 @@ public abstract class TimelineWriter {
public ClientResponse doPostingObject(Object object, String path) {
WebResource webResource = client.resource(resURI);
if (path == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("POST to " + resURI);
+ }
return webResource.accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, object);
} else if (path.equals("domain")) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PUT to " + resURI +"/" + path);
+ }
return webResource.path(path).accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
.put(ClientResponse.class, object);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d49cfb35/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java
index eb47ef2..e1e684b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java
@@ -129,9 +129,9 @@ public class TimelineWebServices {
getUser(req));
} catch (NumberFormatException e) {
throw new BadRequestException(
- "windowStart, windowEnd or limit is not a numeric value.");
+ "windowStart, windowEnd, fromTs or limit is not a numeric value: " + e);
} catch (IllegalArgumentException e) {
- throw new BadRequestException("requested invalid field.");
+ throw new BadRequestException("requested invalid field: " + e);
} catch (Exception e) {
LOG.error("Error getting entities", e);
throw new WebApplicationException(e,
@@ -160,8 +160,7 @@ public class TimelineWebServices {
parseFieldsStr(fields, ","),
getUser(req));
} catch (IllegalArgumentException e) {
- throw new BadRequestException(
- "requested invalid field.");
+ throw new BadRequestException(e);
} catch (Exception e) {
LOG.error("Error getting entity", e);
throw new WebApplicationException(e,
@@ -201,8 +200,9 @@ public class TimelineWebServices {
parseLongStr(limit),
getUser(req));
} catch (NumberFormatException e) {
- throw new BadRequestException(
- "windowStart, windowEnd or limit is not a numeric value.");
+ throw (BadRequestException)new BadRequestException(
+ "windowStart, windowEnd or limit is not a numeric value.")
+ .initCause(e);
} catch (Exception e) {
LOG.error("Error getting entity timelines", e);
throw new WebApplicationException(e,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d49cfb35/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
index efbf994..7eec7c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
@@ -107,30 +107,30 @@ public class EntityCacheItem {
store.init(config);
store.start();
}
- TimelineDataManager tdm = new TimelineDataManager(store,
- aclManager);
- tdm.init(config);
- tdm.start();
- List<LogInfo> removeList = new ArrayList<LogInfo>();
- for (LogInfo log : appLogs.getDetailLogs()) {
- LOG.debug("Try refresh logs for {}", log.getFilename());
- // Only refresh the log that matches the cache id
- if (log.matchesGroupId(groupId)) {
- Path appDirPath = appLogs.getAppDirPath();
- if (fs.exists(log.getPath(appDirPath))) {
- LOG.debug("Refresh logs for cache id {}", groupId);
- log.parseForStore(tdm, appDirPath, appLogs.isDone(), jsonFactory,
- objMapper, fs);
- } else {
- // The log may have been removed, remove the log
- removeList.add(log);
- LOG.info("File {} no longer exists, remove it from log list",
- log.getPath(appDirPath));
+ List<LogInfo> removeList = new ArrayList<>();
+ try(TimelineDataManager tdm =
+ new TimelineDataManager(store, aclManager)) {
+ tdm.init(config);
+ tdm.start();
+ for (LogInfo log : appLogs.getDetailLogs()) {
+ LOG.debug("Try refresh logs for {}", log.getFilename());
+ // Only refresh the log that matches the cache id
+ if (log.matchesGroupId(groupId)) {
+ Path appDirPath = appLogs.getAppDirPath();
+ if (fs.exists(log.getPath(appDirPath))) {
+ LOG.debug("Refresh logs for cache id {}", groupId);
+ log.parseForStore(tdm, appDirPath, appLogs.isDone(),
+ jsonFactory, objMapper, fs);
+ } else {
+ // The log may have been removed, remove the log
+ removeList.add(log);
+ LOG.info("File {} no longer exists, removing it from log list",
+ log.getPath(appDirPath));
+ }
}
}
}
appLogs.getDetailLogs().removeAll(removeList);
- tdm.close();
}
updateRefreshTimeToNow();
} else {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d49cfb35/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
index b1fbd13..34a2072 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
@@ -26,7 +26,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -55,6 +56,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -71,12 +73,13 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Plugin timeline storage to support timeline server v1.5 API. This storage
* uses a file system to store timeline entities in their groups.
*/
-public class EntityGroupFSTimelineStore extends AbstractService
+public class EntityGroupFSTimelineStore extends CompositeService
implements TimelineStore {
static final String DOMAIN_LOG_PREFIX = "domainlog-";
@@ -110,6 +113,7 @@ public class EntityGroupFSTimelineStore extends AbstractService
private ConcurrentMap<ApplicationId, AppLogs> appIdLogMap =
new ConcurrentHashMap<ApplicationId, AppLogs>();
private ScheduledThreadPoolExecutor executor;
+ private AtomicBoolean stopExecutors = new AtomicBoolean(false);
private FileSystem fs;
private ObjectMapper objMapper;
private JsonFactory jsonFactory;
@@ -128,7 +132,8 @@ public class EntityGroupFSTimelineStore extends AbstractService
@Override
protected void serviceInit(Configuration conf) throws Exception {
summaryStore = createSummaryStore();
- summaryStore.init(conf);
+ addService(summaryStore);
+
long logRetainSecs = conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS,
YarnConfiguration
@@ -170,17 +175,28 @@ public class EntityGroupFSTimelineStore extends AbstractService
});
cacheIdPlugins = loadPlugIns(conf);
// Initialize yarn client for application status
- yarnClient = YarnClient.createYarnClient();
- yarnClient.init(conf);
+ yarnClient = createAndInitYarnClient(conf);
+ // if non-null, hook its lifecycle up
+ addIfService(yarnClient);
+ activeRootPath = new Path(conf.get(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
+ YarnConfiguration
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT));
+ doneRootPath = new Path(conf.get(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
+ YarnConfiguration
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT));
+ fs = activeRootPath.getFileSystem(conf);
super.serviceInit(conf);
}
private List<TimelineEntityGroupPlugin> loadPlugIns(Configuration conf)
throws RuntimeException {
- Collection<String> pluginNames = conf.getStringCollection(
+ Collection<String> pluginNames = conf.getTrimmedStringCollection(
YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES);
List<TimelineEntityGroupPlugin> pluginList
= new LinkedList<TimelineEntityGroupPlugin>();
+ Exception caught = null;
for (final String name : pluginNames) {
LOG.debug("Trying to load plugin class {}", name);
TimelineEntityGroupPlugin cacheIdPlugin = null;
@@ -191,10 +207,11 @@ public class EntityGroupFSTimelineStore extends AbstractService
clazz, conf);
} catch (Exception e) {
LOG.warn("Error loading plugin " + name, e);
+ caught = e;
}
if (cacheIdPlugin == null) {
- throw new RuntimeException("No class defined for " + name);
+ throw new RuntimeException("No class defined for " + name, caught);
}
LOG.info("Load plugin class {}", cacheIdPlugin.getClass().getName());
pluginList.add(cacheIdPlugin);
@@ -210,8 +227,9 @@ public class EntityGroupFSTimelineStore extends AbstractService
@Override
protected void serviceStart() throws Exception {
+
+ super.serviceStart();
LOG.info("Starting {}", getName());
- yarnClient.start();
summaryStore.start();
Configuration conf = getConfig();
@@ -219,16 +237,10 @@ public class EntityGroupFSTimelineStore extends AbstractService
aclManager.setTimelineStore(summaryStore);
summaryTdm = new TimelineDataManager(summaryStore, aclManager);
summaryTdm.init(conf);
- summaryTdm.start();
- activeRootPath = new Path(conf.get(
- YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
- YarnConfiguration
- .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT));
- doneRootPath = new Path(conf.get(
- YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
- YarnConfiguration
- .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT));
- fs = activeRootPath.getFileSystem(conf);
+ addService(summaryTdm);
+ // start child services that aren't already started
+ super.serviceStart();
+
if (!fs.exists(activeRootPath)) {
fs.mkdirs(activeRootPath);
fs.setPermission(activeRootPath, ACTIVE_DIR_PERMISSION);
@@ -257,7 +269,8 @@ public class EntityGroupFSTimelineStore extends AbstractService
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS,
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS_DEFAULT);
- LOG.info("Scanning active directory every {} seconds", scanIntervalSecs);
+ LOG.info("Scanning active directory {} every {} seconds", activeRootPath,
+ scanIntervalSecs);
LOG.info("Cleaning logs every {} seconds", cleanerIntervalSecs);
executor = new ScheduledThreadPoolExecutor(numThreads,
@@ -267,12 +280,12 @@ public class EntityGroupFSTimelineStore extends AbstractService
TimeUnit.SECONDS);
executor.scheduleAtFixedRate(new EntityLogCleaner(), cleanerIntervalSecs,
cleanerIntervalSecs, TimeUnit.SECONDS);
- super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
LOG.info("Stopping {}", getName());
+ stopExecutors.set(true);
if (executor != null) {
executor.shutdown();
if (executor.isTerminating()) {
@@ -286,18 +299,9 @@ public class EntityGroupFSTimelineStore extends AbstractService
}
}
}
- if (summaryTdm != null) {
- summaryTdm.stop();
- }
- if (summaryStore != null) {
- summaryStore.stop();
- }
- if (yarnClient != null) {
- yarnClient.stop();
- }
synchronized (cachedLogs) {
for (EntityCacheItem cacheItem : cachedLogs.values()) {
- cacheItem.getStore().close();
+ ServiceOperations.stopQuietly(cacheItem.getStore());
}
}
super.serviceStop();
@@ -305,17 +309,34 @@ public class EntityGroupFSTimelineStore extends AbstractService
@InterfaceAudience.Private
@VisibleForTesting
- void scanActiveLogs() throws IOException {
- RemoteIterator<FileStatus> iter = fs.listStatusIterator(activeRootPath);
+ int scanActiveLogs() throws IOException {
+ RemoteIterator<FileStatus> iter = list(activeRootPath);
+ int logsToScanCount = 0;
while (iter.hasNext()) {
FileStatus stat = iter.next();
- ApplicationId appId = parseApplicationId(stat.getPath().getName());
+ String name = stat.getPath().getName();
+ ApplicationId appId = parseApplicationId(name);
if (appId != null) {
LOG.debug("scan logs for {} in {}", appId, stat.getPath());
+ logsToScanCount++;
AppLogs logs = getAndSetActiveLog(appId, stat.getPath());
executor.execute(new ActiveLogParser(logs));
+ } else {
+ LOG.debug("Unable to parse entry {}", name);
}
}
+ return logsToScanCount;
+ }
+
+ /**
+ * List a directory, returning an iterator which will fail fast if this
+ * service has been stopped
+ * @param path path to list
+ * @return an iterator over the contents of the directory
+ * @throws IOException
+ */
+ private RemoteIterator<FileStatus> list(Path path) throws IOException {
+ return new StoppableRemoteIterator(fs.listStatusIterator(path));
}
private AppLogs createAndPutAppLogsIfAbsent(ApplicationId appId,
@@ -377,11 +398,11 @@ public class EntityGroupFSTimelineStore extends AbstractService
*/
@InterfaceAudience.Private
@VisibleForTesting
- static void cleanLogs(Path dirpath, FileSystem fs, long retainMillis)
+ void cleanLogs(Path dirpath, FileSystem fs, long retainMillis)
throws IOException {
long now = Time.now();
// Depth first search from root directory for all application log dirs
- RemoteIterator<FileStatus> iter = fs.listStatusIterator(dirpath);
+ RemoteIterator<FileStatus> iter = list(dirpath);
while (iter.hasNext()) {
FileStatus stat = iter.next();
if (stat.isDirectory()) {
@@ -456,7 +477,42 @@ public class EntityGroupFSTimelineStore extends AbstractService
bucket1, bucket2, appId.toString()));
}
- // This method has to be synchronized to control traffic to RM
+ /**
+ * Create and initialize the YARN Client. Tests may override/mock this.
+ * If they return null, then {@link #getAppState(ApplicationId)} MUST
+ * also be overridden
+ * @param conf configuration
+ * @return the yarn client, or null.
+ *
+ */
+ @VisibleForTesting
+ protected YarnClient createAndInitYarnClient(Configuration conf) {
+ YarnClient client = YarnClient.createYarnClient();
+ client.init(conf);
+ return client;
+ }
+
+ /**
+ * Get the application state.
+ * @param appId application ID
+ * @return the state or {@link AppState#UNKNOWN} if it could not
+ * be determined
+ * @throws IOException on IO problems
+ */
+ @VisibleForTesting
+ protected AppState getAppState(ApplicationId appId) throws IOException {
+ return getAppState(appId, yarnClient);
+ }
+
+ /**
+ * Ask the RM for the state of the application.
+ * This method has to be synchronized to control traffic to RM
+ * @param appId application ID
+ * @param yarnClient
+ * @return the state or {@link AppState#UNKNOWN} if it could not
+ * be determined
+ * @throws IOException
+ */
private static synchronized AppState getAppState(ApplicationId appId,
YarnClient yarnClient) throws IOException {
AppState appState = AppState.ACTIVE;
@@ -474,9 +530,12 @@ public class EntityGroupFSTimelineStore extends AbstractService
return appState;
}
+ /**
+ * Application states,
+ */
@InterfaceAudience.Private
@VisibleForTesting
- enum AppState {
+ public enum AppState {
ACTIVE,
UNKNOWN,
COMPLETED
@@ -526,7 +585,7 @@ public class EntityGroupFSTimelineStore extends AbstractService
if (!isDone()) {
LOG.debug("Try to parse summary log for log {} in {}",
appId, appDirPath);
- appState = EntityGroupFSTimelineStore.getAppState(appId, yarnClient);
+ appState = getAppState(appId);
long recentLogModTime = scanForLogs();
if (appState == AppState.UNKNOWN) {
if (Time.now() - recentLogModTime > unknownActiveMillis) {
@@ -559,8 +618,7 @@ public class EntityGroupFSTimelineStore extends AbstractService
long scanForLogs() throws IOException {
LOG.debug("scanForLogs on {}", appDirPath);
long newestModTime = 0;
- RemoteIterator<FileStatus> iterAttempt =
- fs.listStatusIterator(appDirPath);
+ RemoteIterator<FileStatus> iterAttempt = list(appDirPath);
while (iterAttempt.hasNext()) {
FileStatus statAttempt = iterAttempt.next();
LOG.debug("scanForLogs on {}", statAttempt.getPath().getName());
@@ -572,8 +630,7 @@ public class EntityGroupFSTimelineStore extends AbstractService
continue;
}
String attemptDirName = statAttempt.getPath().getName();
- RemoteIterator<FileStatus> iterCache
- = fs.listStatusIterator(statAttempt.getPath());
+ RemoteIterator<FileStatus> iterCache = list(statAttempt.getPath());
while (iterCache.hasNext()) {
FileStatus statCache = iterCache.next();
if (!statCache.isFile()) {
@@ -659,14 +716,34 @@ public class EntityGroupFSTimelineStore extends AbstractService
}
}
+ /**
+ * Extract any nested throwable forwarded from IPC operations.
+ * @param e exception
+ * @return either the exception passed an an argument, or any nested
+ * exception which was wrapped inside an {@link UndeclaredThrowableException}
+ */
+ private Throwable extract(Exception e) {
+ Throwable t = e;
+ if (e instanceof UndeclaredThrowableException && e.getCause() != null) {
+ t = e.getCause();
+ }
+ return t;
+ }
+
private class EntityLogScanner implements Runnable {
@Override
public void run() {
LOG.debug("Active scan starting");
try {
- scanActiveLogs();
+ int scanned = scanActiveLogs();
+ LOG.debug("Scanned {} active applications", scanned);
} catch (Exception e) {
- LOG.error("Error scanning active files", e);
+ Throwable t = extract(e);
+ if (t instanceof InterruptedException) {
+ LOG.info("File scanner interrupted");
+ } else {
+ LOG.error("Error scanning active files", t);
+ }
}
LOG.debug("Active scan complete");
}
@@ -690,7 +767,12 @@ public class EntityGroupFSTimelineStore extends AbstractService
}
LOG.debug("End parsing summary logs. ");
} catch (Exception e) {
- LOG.error("Error processing logs for " + appLogs.getAppId(), e);
+ Throwable t = extract(e);
+ if (t instanceof InterruptedException) {
+ LOG.info("Log parser interrupted");
+ } else {
+ LOG.error("Error processing logs for " + appLogs.getAppId(), t);
+ }
}
}
}
@@ -702,7 +784,12 @@ public class EntityGroupFSTimelineStore extends AbstractService
try {
cleanLogs(doneRootPath, fs, logRetainMillis);
} catch (Exception e) {
- LOG.error("Error cleaning files", e);
+ Throwable t = extract(e);
+ if (t instanceof InterruptedException) {
+ LOG.info("Cleaner interrupted");
+ } else {
+ LOG.error("Error cleaning files", e);
+ }
}
LOG.debug("Cleaner finished");
}
@@ -892,4 +979,29 @@ public class EntityGroupFSTimelineStore extends AbstractService
public void put(TimelineDomain domain) throws IOException {
summaryStore.put(domain);
}
+
+ /**
+ * This is a special remote iterator whose {@link #hasNext()} method
+ * returns false if {@link #stopExecutors} is true.
+ *
+ * This provides an implicit shutdown of all iterative file list and scan
+ * operations without needing to implement it in the while loops themselves.
+ */
+ private class StoppableRemoteIterator implements RemoteIterator<FileStatus> {
+ private final RemoteIterator<FileStatus> remote;
+
+ public StoppableRemoteIterator(RemoteIterator<FileStatus> remote) {
+ this.remote = remote;
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return !stopExecutors.get() && remote.hasNext();
+ }
+
+ @Override
+ public FileStatus next() throws IOException {
+ return remote.next();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d49cfb35/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java
index 4caed8d..bc80175 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.timeline;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
@@ -103,7 +104,8 @@ abstract class LogInfo {
LOG.debug("Parsing for log dir {} on attempt {}", appDirPath,
attemptDirName);
Path logPath = getPath(appDirPath);
- if (fs.exists(logPath)) {
+ FileStatus status = fs.getFileStatus(logPath);
+ if (status != null) {
long startTime = Time.monotonicNow();
try {
LOG.debug("Parsing {} at offset {}", logPath, offset);
@@ -112,8 +114,11 @@ abstract class LogInfo {
LOG.info("Parsed {} entities from {} in {} msec",
count, logPath, Time.monotonicNow() - startTime);
} catch (RuntimeException e) {
- if (e.getCause() instanceof JsonParseException) {
- // If AppLogs cannot parse this log, it may be corrupted
+ // If AppLogs cannot parse this log, it may be corrupted or just empty
+ if (e.getCause() instanceof JsonParseException &&
+ (status.getLen() > 0 || offset > 0)) {
+ // log on parse problems if the file as been read in the past or
+ // is visibly non-empty
LOG.info("Log {} appears to be corrupted. Skip. ", logPath);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d49cfb35/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
index e43b705..3e5bc06 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
@@ -116,14 +116,14 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
EntityGroupPlugInForTest.class.getName());
}
store.init(config);
- store.start();
store.setFs(fs);
+ store.start();
}
@After
public void tearDown() throws Exception {
- fs.delete(TEST_APP_DIR_PATH, true);
store.stop();
+ fs.delete(TEST_APP_DIR_PATH, true);
}
@AfterClass
@@ -222,7 +222,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
fs.mkdirs(dirPathEmpty);
// Should retain all logs after this run
- EntityGroupFSTimelineStore.cleanLogs(TEST_DONE_DIR_PATH, fs, 10000);
+ store.cleanLogs(TEST_DONE_DIR_PATH, fs, 10000);
assertTrue(fs.exists(irrelevantDirPath));
assertTrue(fs.exists(irrelevantFilePath));
assertTrue(fs.exists(filePath));
@@ -239,7 +239,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
// Touch the third application by creating a new dir
fs.mkdirs(new Path(dirPathHold, "holdByMe"));
- EntityGroupFSTimelineStore.cleanLogs(TEST_DONE_DIR_PATH, fs, 1000);
+ store.cleanLogs(TEST_DONE_DIR_PATH, fs, 1000);
// Verification after the second cleaner call
assertTrue(fs.exists(irrelevantDirPath));