You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/01/20 19:49:11 UTC
[04/24] hadoop git commit: YARN-4265. Provide new timeline plugin
storage to support fine-grained entity caching. Contributed by Li Lu and
Jason Lowe
YARN-4265. Provide new timeline plugin storage to support fine-grained entity caching. Contributed by Li Lu and Jason Lowe
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/02f597c5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/02f597c5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/02f597c5
Branch: refs/heads/HDFS-1312
Commit: 02f597c5db36ded385413958bdee793ad7eda40e
Parents: da77f42
Author: Junping Du <ju...@apache.org>
Authored: Sun Jan 17 17:37:40 2016 -0800
Committer: Junping Du <ju...@apache.org>
Committed: Sun Jan 17 17:37:40 2016 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../hadoop/yarn/conf/YarnConfiguration.java | 62 +-
.../src/main/resources/yarn-default.xml | 58 ++
.../pom.xml | 11 +
.../ApplicationHistoryServer.java | 5 +-
.../server/timeline/TimelineDataManager.java | 1 -
.../timeline/TimelineDataManagerMetrics.java | 11 +-
.../TestApplicationHistoryClientService.java | 1 +
...pplicationHistoryManagerOnTimelineStore.java | 1 +
.../webapp/TestAHSWebServices.java | 1 +
.../timeline/TestTimelineDataManager.java | 1 +
.../pom.xml | 136 +++
.../yarn/server/timeline/EntityCacheItem.java | 170 ++++
.../timeline/EntityGroupFSTimelineStore.java | 895 +++++++++++++++++++
.../hadoop/yarn/server/timeline/LogInfo.java | 281 ++++++
.../timeline/TimelineEntityGroupPlugin.java | 74 ++
.../yarn/server/timeline/package-info.java | 23 +
.../timeline/EntityGroupPlugInForTest.java | 56 ++
.../server/timeline/PluginStoreTestUtils.java | 208 +++++
.../TestEntityGroupFSTimelineStore.java | 332 +++++++
.../yarn/server/timeline/TestLogInfo.java | 253 ++++++
.../hadoop-yarn/hadoop-yarn-server/pom.xml | 1 +
22 files changed, 2574 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 9dbed10..d0266f3 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -338,6 +338,9 @@ Release 2.8.0 - UNRELEASED
YARN-4234. New put APIs in TimelineClient for ats v1.5. (Xuan Gong via
junping_du)
+ YARN-4265. Provide new timeline plugin storage to support fine-grained entity
+ caching. (Li Lu and Jason Lowe via junping_du)
+
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index d3c1f1a..23c2969 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1590,6 +1590,7 @@ public class YarnConfiguration extends Configuration {
public static final String TIMELINE_SERVICE_VERSION = TIMELINE_SERVICE_PREFIX
+ "version";
public static final float DEFAULT_TIMELINE_SERVICE_VERSION = 1.0f;
+
/**
* Comma seperated list of names for UIs hosted in the timeline server
* (For pluggable UIs).
@@ -1625,17 +1626,70 @@ public class YarnConfiguration extends Configuration {
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT =
"/tmp/entity-file-history/active";
+ public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR =
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "done-dir";
public static final String
- TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
- TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "retry-policy-spec";
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT =
+ "/tmp/entity-file-history/done";
+
+ public static final String TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES =
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "group-id-plugin-classes";
+
public static final String
- DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
- "2000, 500";
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_STORE =
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "summary-store";
public static final String
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "summary-entity-types";
+ public static final String
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS =
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "scan-interval-seconds";
+ public static final long
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS_DEFAULT = 60;
+
+ public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS =
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "threads";
+ public static final int
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS_DEFAULT = 16;
+
+ public static final String
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE
+ = TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "app-cache-size";
+ public static final int
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE_DEFAULT = 10;
+
+ public static final String
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_CLEANER_INTERVAL_SECONDS =
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "cleaner-interval-seconds";
+ public static final int
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_CLEANER_INTERVAL_SECONDS_DEFAULT =
+ 60 * 60;
+
+ public static final String
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS
+ = TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "retain-seconds";
+ public static final int
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS_DEFAULT =
+ 7 * 24 * 60 * 60;
+
+ // how old the most recent log of an UNKNOWN app needs to be in the active
+ // directory before we treat it as COMPLETED
+ public static final String
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_UNKNOWN_ACTIVE_SECONDS =
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "unknown-active-seconds";
+ public static final int
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_UNKNOWN_ACTIVE_SECONDS_DEFAULT
+ = 24 * 60 * 60;
+
+ public static final String
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "retry-policy-spec";
+ public static final String
+ DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
+ "2000, 500";
+
public static final String TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS =
TIMELINE_SERVICE_CLIENT_PREFIX + "fd-flush-interval-secs";
public static final long
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 49cced6..6508a2a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1978,6 +1978,64 @@
<value>${hadoop.tmp.dir}/yarn/timeline</value>
</property>
+ <!-- Timeline Service v1.5 Configuration -->
+
+ <property>
+ <name>yarn.timeline-service.entity-group-fs-store.active-dir</name>
+ <value>/tmp/entity-file-history/active</value>
+ <description>HDFS path to store active application’s timeline data</description>
+ </property>
+
+ <property>
+ <name>yarn.timeline-service.entity-group-fs-store.done-dir</name>
+ <value>/tmp/entity-file-history/done/</value>
+ <description>HDFS path to store done application’s timeline data</description>
+ </property>
+
+ <property>
+ <name>yarn.timeline-service.entity-group-fs-store.group-id-plugin-classes</name>
+ <value></value>
+ <description>
+ Plugins that can translate a timeline entity read request into
+ a list of timeline entity group ids, separated by commas.
+ </description>
+ </property>
+
+ <property>
+ <name>yarn.timeline-service.entity-group-fs-store.summary-store</name>
+ <description>Summary storage for ATS v1.5</description>
+ <value>org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore</value>
+ </property>
+
+ <property>
+ <name>yarn.timeline-service.entity-group-fs-store.scan-interval-seconds</name>
+ <description>
+ Scan interval for ATS v1.5 entity group file system storage reader.This
+ value controls how frequent the reader will scan the HDFS active directory
+ for application status.
+ </description>
+ <value>60</value>
+ </property>
+
+ <property>
+ <name>yarn.timeline-service.entity-group-fs-store.cleaner-interval-seconds</name>
+ <description>
+ Scan interval for ATS v1.5 entity group file system storage cleaner.This
+ value controls how frequent the reader will scan the HDFS done directory
+ for stale application data.
+ </description>
+ <value>3600</value>
+ </property>
+
+ <property>
+ <name>yarn.timeline-service.entity-group-fs-store.retain-seconds</name>
+ <description>
+ How long the ATS v1.5 entity group file system storage will keep an
+ application's data in the done directory.
+ </description>
+ <value>604800</value>
+ </property>
+
<!-- Shared Cache Configuration -->
<property>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
index 9748374..d9bc05e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
@@ -220,6 +220,17 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ <phase>test-compile</phase>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
index 160ad8d..f4fe140 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
@@ -228,8 +228,9 @@ public class ApplicationHistoryServer extends CompositeService {
}
private TimelineDataManager createTimelineDataManager(Configuration conf) {
- return new TimelineDataManager(
- timelineStore, new TimelineACLsManager(conf));
+ TimelineACLsManager aclsMgr = new TimelineACLsManager(conf);
+ aclsMgr.setTimelineStore(timelineStore);
+ return new TimelineDataManager(timelineStore, aclsMgr);
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java
index 23ff8e4..57a9346 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java
@@ -67,7 +67,6 @@ public class TimelineDataManager extends AbstractService {
super(TimelineDataManager.class.getName());
this.store = store;
this.timelineACLsManager = timelineACLsManager;
- timelineACLsManager.setTimelineStore(store);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManagerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManagerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManagerMetrics.java
index afd5818..3591b39 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManagerMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManagerMetrics.java
@@ -92,12 +92,17 @@ public class TimelineDataManagerMetrics {
getDomainsOps.value();
}
+ private static TimelineDataManagerMetrics instance = null;
+
TimelineDataManagerMetrics() {
}
- public static TimelineDataManagerMetrics create() {
- MetricsSystem ms = DefaultMetricsSystem.instance();
- return ms.register(new TimelineDataManagerMetrics());
+ public static synchronized TimelineDataManagerMetrics create() {
+ if (instance == null) {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ instance = ms.register(new TimelineDataManagerMetrics());
+ }
+ return instance;
}
public void incrGetEntitiesOps() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java
index 1e98e8d..7ef6eca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java
@@ -65,6 +65,7 @@ public class TestApplicationHistoryClientService {
TimelineStore store =
TestApplicationHistoryManagerOnTimelineStore.createStore(MAX_APPS);
TimelineACLsManager aclsManager = new TimelineACLsManager(conf);
+ aclsManager.setTimelineStore(store);
dataManager =
new TimelineDataManager(store, aclsManager);
dataManager.init(conf);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java
index a669f37..dfc5b81 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java
@@ -96,6 +96,7 @@ public class TestApplicationHistoryManagerOnTimelineStore {
public void setup() throws Exception {
// Only test the ACLs of the generic history
TimelineACLsManager aclsManager = new TimelineACLsManager(new YarnConfiguration());
+ aclsManager.setTimelineStore(store);
TimelineDataManager dataManager =
new TimelineDataManager(store, aclsManager);
dataManager.init(conf);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
index f2179b4..20dfe45 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
@@ -90,6 +90,7 @@ public class TestAHSWebServices extends JerseyTestBase {
TimelineStore store =
TestApplicationHistoryManagerOnTimelineStore.createStore(MAX_APPS);
TimelineACLsManager aclsManager = new TimelineACLsManager(conf);
+ aclsManager.setTimelineStore(store);
TimelineDataManager dataManager =
new TimelineDataManager(store, aclsManager);
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestTimelineDataManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestTimelineDataManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestTimelineDataManager.java
index ace2eb8..8fba54c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestTimelineDataManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestTimelineDataManager.java
@@ -62,6 +62,7 @@ public class TestTimelineDataManager extends TimelineStoreTestUtils {
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, false);
aclsManager = new TimelineACLsManager(conf);
+ aclsManager.setTimelineStore(store);
dataManaer = new TimelineDataManager(store, aclsManager);
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml
new file mode 100644
index 0000000..385ba5d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml
@@ -0,0 +1,136 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>hadoop-yarn-server</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ <version>3.0.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-timeline-pluginstorage</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+ <name>Apache Hadoop YARN Timeline Plugin Storage</name>
+
+ <properties>
+ <!-- Needed for generating FindBugs warnings using parent pom -->
+ <yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
+ </properties>
+
+ <dependencies>
+ <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-el</groupId>
+ <artifactId>commons-el</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1-jetty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-xc</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
new file mode 100644
index 0000000..37a1d8d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Cache item for timeline server v1.5 reader cache. Each cache item has a
+ * TimelineStore that can be filled with data within one entity group.
+ */
+public class EntityCacheItem {
+ private static final Logger LOG
+ = LoggerFactory.getLogger(EntityCacheItem.class);
+
+ private TimelineStore store;
+ private EntityGroupFSTimelineStore.AppLogs appLogs;
+ private long lastRefresh;
+ private Configuration config;
+ private FileSystem fs;
+
+ public EntityCacheItem(Configuration config, FileSystem fs) {
+ this.config = config;
+ this.fs = fs;
+ }
+
+ /**
+ * @return The application log associated to this cache item, may be null.
+ */
+ public synchronized EntityGroupFSTimelineStore.AppLogs getAppLogs() {
+ return this.appLogs;
+ }
+
+ /**
+ * Set the application logs to this cache item. The entity group should be
+ * associated with this application.
+ *
+ * @param incomingAppLogs
+ */
+ public synchronized void setAppLogs(
+ EntityGroupFSTimelineStore.AppLogs incomingAppLogs) {
+ this.appLogs = incomingAppLogs;
+ }
+
+ /**
+ * @return The timeline store, either loaded or unloaded, of this cache item.
+ */
+ public synchronized TimelineStore getStore() {
+ return store;
+ }
+
+ /**
+ * Refresh this cache item if it needs refresh. This will enforce an appLogs
+ * rescan and then load new data. The refresh process is synchronized with
+ * other operations on the same cache item.
+ *
+ * @param groupId
+ * @param aclManager
+ * @param jsonFactory
+ * @param objMapper
+ * @return a {@link org.apache.hadoop.yarn.server.timeline.TimelineStore}
+ * object filled with all entities in the group.
+ * @throws IOException
+ */
+ public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId,
+ TimelineACLsManager aclManager, JsonFactory jsonFactory,
+ ObjectMapper objMapper) throws IOException {
+ if (needRefresh()) {
+ // If an application is not finished, we only update summary logs (and put
+ // new entities into summary storage).
+ // Otherwise, since the application is done, we can update detail logs.
+ if (!appLogs.isDone()) {
+ appLogs.parseSummaryLogs();
+ } else if (appLogs.getDetailLogs().isEmpty()) {
+ appLogs.scanForLogs();
+ }
+ if (!appLogs.getDetailLogs().isEmpty()) {
+ if (store == null) {
+ store = new MemoryTimelineStore();
+ store.init(config);
+ store.start();
+ }
+ TimelineDataManager tdm = new TimelineDataManager(store,
+ aclManager);
+ tdm.init(config);
+ tdm.start();
+ List<LogInfo> removeList = new ArrayList<LogInfo>();
+ for (LogInfo log : appLogs.getDetailLogs()) {
+ LOG.debug("Try refresh logs for {}", log.getFilename());
+ // Only refresh the log that matches the cache id
+ if (log.matchesGroupId(groupId)) {
+ Path appDirPath = appLogs.getAppDirPath();
+ if (fs.exists(log.getPath(appDirPath))) {
+ LOG.debug("Refresh logs for cache id {}", groupId);
+ log.parseForStore(tdm, appDirPath, appLogs.isDone(), jsonFactory,
+ objMapper, fs);
+ } else {
+ // The log may have been removed, remove the log
+ removeList.add(log);
+ LOG.info("File {} no longer exists, remove it from log list",
+ log.getPath(appDirPath));
+ }
+ }
+ }
+ appLogs.getDetailLogs().removeAll(removeList);
+ tdm.close();
+ }
+ updateRefreshTimeToNow();
+ } else {
+ LOG.debug("Cache new enough, skip refreshing");
+ }
+ return store;
+ }
+
+ /**
+ * Release the cache item for the given group id.
+ *
+ * @param groupId
+ */
+ public synchronized void releaseCache(TimelineEntityGroupId groupId) {
+ try {
+ if (store != null) {
+ store.close();
+ }
+ } catch (IOException e) {
+ LOG.warn("Error closing timeline store", e);
+ }
+ store = null;
+ // reset offsets so next time logs are re-parsed
+ for (LogInfo log : appLogs.getDetailLogs()) {
+ if (log.getFilename().contains(groupId.toString())) {
+ log.setOffset(0);
+ }
+ }
+ }
+
+ private boolean needRefresh() {
+ return (Time.monotonicNow() - lastRefresh > 10000);
+ }
+
+ private void updateRefreshTimeToNow() {
+ this.lastRefresh = Time.monotonicNow();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
new file mode 100644
index 0000000..b1fbd13
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
@@ -0,0 +1,895 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
+import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.map.MappingJsonFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Plugin timeline storage to support timeline server v1.5 API. This storage
+ * uses a file system to store timeline entities in their groups.
+ */
+public class EntityGroupFSTimelineStore extends AbstractService
+ implements TimelineStore {
+
+ static final String DOMAIN_LOG_PREFIX = "domainlog-";
+ static final String SUMMARY_LOG_PREFIX = "summarylog-";
+ static final String ENTITY_LOG_PREFIX = "entitylog-";
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ EntityGroupFSTimelineStore.class);
+ private static final FsPermission ACTIVE_DIR_PERMISSION =
+ new FsPermission((short) 01777);
+ private static final FsPermission DONE_DIR_PERMISSION =
+ new FsPermission((short) 0700);
+
+ private static final EnumSet<YarnApplicationState>
+ APP_FINAL_STATES = EnumSet.of(
+ YarnApplicationState.FAILED,
+ YarnApplicationState.KILLED,
+ YarnApplicationState.FINISHED);
+ // Active dir: <activeRoot>/appId/attemptId/cacheId.log
+ // Done dir: <doneRoot>/cluster_ts/hash1/hash2/appId/attemptId/cacheId.log
+ private static final String APP_DONE_DIR_PREFIX_FORMAT =
+ "%d" + Path.SEPARATOR // cluster timestamp
+ + "%04d" + Path.SEPARATOR // app num / 1,000,000
+ + "%03d" + Path.SEPARATOR // (app num / 1000) % 1000
+ + "%s" + Path.SEPARATOR; // full app id
+
+ private YarnClient yarnClient;
+ private TimelineStore summaryStore;
+ private TimelineACLsManager aclManager;
+ private TimelineDataManager summaryTdm;
+ private ConcurrentMap<ApplicationId, AppLogs> appIdLogMap =
+ new ConcurrentHashMap<ApplicationId, AppLogs>();
+ private ScheduledThreadPoolExecutor executor;
+ private FileSystem fs;
+ private ObjectMapper objMapper;
+ private JsonFactory jsonFactory;
+ private Path activeRootPath;
+ private Path doneRootPath;
+ private long logRetainMillis;
+ private long unknownActiveMillis;
+ private int appCacheMaxSize = 0;
+ private List<TimelineEntityGroupPlugin> cacheIdPlugins;
+ private Map<TimelineEntityGroupId, EntityCacheItem> cachedLogs;
+
+ public EntityGroupFSTimelineStore() {
+ super(EntityGroupFSTimelineStore.class.getSimpleName());
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ summaryStore = createSummaryStore();
+ summaryStore.init(conf);
+ long logRetainSecs = conf.getLong(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS,
+ YarnConfiguration
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS_DEFAULT);
+ logRetainMillis = logRetainSecs * 1000;
+ LOG.info("Cleaner set to delete logs older than {} seconds", logRetainSecs);
+ long unknownActiveSecs = conf.getLong(
+ YarnConfiguration
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_UNKNOWN_ACTIVE_SECONDS,
+ YarnConfiguration.
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_UNKNOWN_ACTIVE_SECONDS_DEFAULT
+ );
+ unknownActiveMillis = unknownActiveSecs * 1000;
+ LOG.info("Unknown apps will be treated as complete after {} seconds",
+ unknownActiveSecs);
+ appCacheMaxSize = conf.getInt(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE,
+ YarnConfiguration
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE_DEFAULT);
+ LOG.info("Application cache size is {}", appCacheMaxSize);
+ cachedLogs = Collections.synchronizedMap(
+ new LinkedHashMap<TimelineEntityGroupId, EntityCacheItem>(
+ appCacheMaxSize + 1, 0.75f, true) {
+ @Override
+ protected boolean removeEldestEntry(
+ Map.Entry<TimelineEntityGroupId, EntityCacheItem> eldest) {
+ if (super.size() > appCacheMaxSize) {
+ TimelineEntityGroupId groupId = eldest.getKey();
+ LOG.debug("Evicting {} due to space limitations", groupId);
+ EntityCacheItem cacheItem = eldest.getValue();
+ cacheItem.releaseCache(groupId);
+ if (cacheItem.getAppLogs().isDone()) {
+ appIdLogMap.remove(groupId.getApplicationId());
+ }
+ return true;
+ }
+ return false;
+ }
+ });
+ cacheIdPlugins = loadPlugIns(conf);
+ // Initialize yarn client for application status
+ yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(conf);
+ super.serviceInit(conf);
+ }
+
+ private List<TimelineEntityGroupPlugin> loadPlugIns(Configuration conf)
+ throws RuntimeException {
+ Collection<String> pluginNames = conf.getStringCollection(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES);
+ List<TimelineEntityGroupPlugin> pluginList
+ = new LinkedList<TimelineEntityGroupPlugin>();
+ for (final String name : pluginNames) {
+ LOG.debug("Trying to load plugin class {}", name);
+ TimelineEntityGroupPlugin cacheIdPlugin = null;
+ try {
+ Class<?> clazz = conf.getClassByName(name);
+ cacheIdPlugin =
+ (TimelineEntityGroupPlugin) ReflectionUtils.newInstance(
+ clazz, conf);
+ } catch (Exception e) {
+ LOG.warn("Error loading plugin " + name, e);
+ }
+
+ if (cacheIdPlugin == null) {
+ throw new RuntimeException("No class defined for " + name);
+ }
+ LOG.info("Load plugin class {}", cacheIdPlugin.getClass().getName());
+ pluginList.add(cacheIdPlugin);
+ }
+ return pluginList;
+ }
+
+ private TimelineStore createSummaryStore() {
+ return ReflectionUtils.newInstance(getConfig().getClass(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_STORE,
+ LeveldbTimelineStore.class, TimelineStore.class), getConfig());
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ LOG.info("Starting {}", getName());
+ yarnClient.start();
+ summaryStore.start();
+
+ Configuration conf = getConfig();
+ aclManager = new TimelineACLsManager(conf);
+ aclManager.setTimelineStore(summaryStore);
+ summaryTdm = new TimelineDataManager(summaryStore, aclManager);
+ summaryTdm.init(conf);
+ summaryTdm.start();
+ activeRootPath = new Path(conf.get(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
+ YarnConfiguration
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT));
+ doneRootPath = new Path(conf.get(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
+ YarnConfiguration
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT));
+ fs = activeRootPath.getFileSystem(conf);
+ if (!fs.exists(activeRootPath)) {
+ fs.mkdirs(activeRootPath);
+ fs.setPermission(activeRootPath, ACTIVE_DIR_PERMISSION);
+ }
+ if (!fs.exists(doneRootPath)) {
+ fs.mkdirs(doneRootPath);
+ fs.setPermission(doneRootPath, DONE_DIR_PERMISSION);
+ }
+
+ objMapper = new ObjectMapper();
+ objMapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector());
+ jsonFactory = new MappingJsonFactory(objMapper);
+ final long scanIntervalSecs = conf.getLong(
+ YarnConfiguration
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS,
+ YarnConfiguration
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS_DEFAULT
+ );
+ final long cleanerIntervalSecs = conf.getLong(
+ YarnConfiguration
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_CLEANER_INTERVAL_SECONDS,
+ YarnConfiguration
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_CLEANER_INTERVAL_SECONDS_DEFAULT
+ );
+ final int numThreads = conf.getInt(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS,
+ YarnConfiguration
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS_DEFAULT);
+ LOG.info("Scanning active directory every {} seconds", scanIntervalSecs);
+ LOG.info("Cleaning logs every {} seconds", cleanerIntervalSecs);
+
+ executor = new ScheduledThreadPoolExecutor(numThreads,
+ new ThreadFactoryBuilder().setNameFormat("EntityLogPluginWorker #%d")
+ .build());
+ executor.scheduleAtFixedRate(new EntityLogScanner(), 0, scanIntervalSecs,
+ TimeUnit.SECONDS);
+ executor.scheduleAtFixedRate(new EntityLogCleaner(), cleanerIntervalSecs,
+ cleanerIntervalSecs, TimeUnit.SECONDS);
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ LOG.info("Stopping {}", getName());
+ if (executor != null) {
+ executor.shutdown();
+ if (executor.isTerminating()) {
+ LOG.info("Waiting for executor to terminate");
+ boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
+ if (terminated) {
+ LOG.info("Executor terminated");
+ } else {
+ LOG.warn("Executor did not terminate");
+ executor.shutdownNow();
+ }
+ }
+ }
+ if (summaryTdm != null) {
+ summaryTdm.stop();
+ }
+ if (summaryStore != null) {
+ summaryStore.stop();
+ }
+ if (yarnClient != null) {
+ yarnClient.stop();
+ }
+ synchronized (cachedLogs) {
+ for (EntityCacheItem cacheItem : cachedLogs.values()) {
+ cacheItem.getStore().close();
+ }
+ }
+ super.serviceStop();
+ }
+
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ void scanActiveLogs() throws IOException {
+ RemoteIterator<FileStatus> iter = fs.listStatusIterator(activeRootPath);
+ while (iter.hasNext()) {
+ FileStatus stat = iter.next();
+ ApplicationId appId = parseApplicationId(stat.getPath().getName());
+ if (appId != null) {
+ LOG.debug("scan logs for {} in {}", appId, stat.getPath());
+ AppLogs logs = getAndSetActiveLog(appId, stat.getPath());
+ executor.execute(new ActiveLogParser(logs));
+ }
+ }
+ }
+
+ private AppLogs createAndPutAppLogsIfAbsent(ApplicationId appId,
+ Path appDirPath, AppState appState) {
+ AppLogs appLogs = new AppLogs(appId, appDirPath, appState);
+ AppLogs oldAppLogs = appIdLogMap.putIfAbsent(appId, appLogs);
+ if (oldAppLogs != null) {
+ appLogs = oldAppLogs;
+ }
+ return appLogs;
+ }
+
+ private AppLogs getAndSetActiveLog(ApplicationId appId, Path appDirPath) {
+ AppLogs appLogs = appIdLogMap.get(appId);
+ if (appLogs == null) {
+ appLogs = createAndPutAppLogsIfAbsent(appId, appDirPath, AppState.ACTIVE);
+ }
+ return appLogs;
+ }
+
+ // searches for the app logs and returns it if found else null
+ private AppLogs getAndSetAppLogs(ApplicationId applicationId)
+ throws IOException {
+ LOG.debug("Looking for app logs mapped for app id {}", applicationId);
+ AppLogs appLogs = appIdLogMap.get(applicationId);
+ if (appLogs == null) {
+ AppState appState = AppState.UNKNOWN;
+ Path appDirPath = getDoneAppPath(applicationId);
+ if (fs.exists(appDirPath)) {
+ appState = AppState.COMPLETED;
+ } else {
+ appDirPath = getActiveAppPath(applicationId);
+ if (fs.exists(appDirPath)) {
+ appState = AppState.ACTIVE;
+ }
+ }
+ if (appState != AppState.UNKNOWN) {
+ LOG.debug("Create and try to add new appLogs to appIdLogMap for {}",
+ applicationId);
+ appLogs = createAndPutAppLogsIfAbsent(
+ applicationId, appDirPath, appState);
+ }
+ }
+ return appLogs;
+ }
+
+ /**
+ * Main function for entity log cleaner. This method performs depth first
+ * search from a given dir path for all application log dirs. Once found, it
+ * will decide if the directory should be cleaned up and then clean them.
+ *
+ * @param dirpath the root directory the cleaner should start with. Note that
+ * dirpath should be a directory that contains a set of
+ * application log directories. The cleaner method will not
+ * work if the given dirpath itself is an application log dir.
+ * @param fs
+ * @param retainMillis
+ * @throws IOException
+ */
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ static void cleanLogs(Path dirpath, FileSystem fs, long retainMillis)
+ throws IOException {
+ long now = Time.now();
+ // Depth first search from root directory for all application log dirs
+ RemoteIterator<FileStatus> iter = fs.listStatusIterator(dirpath);
+ while (iter.hasNext()) {
+ FileStatus stat = iter.next();
+ if (stat.isDirectory()) {
+ // If current is an application log dir, decide if we need to remove it
+ // and remove if necessary.
+ // Otherwise, keep iterating into it.
+ ApplicationId appId = parseApplicationId(dirpath.getName());
+ if (appId != null) { // Application log dir
+ if (shouldCleanAppLogDir(dirpath, now, fs, retainMillis)) {
+ try {
+ LOG.info("Deleting {}", dirpath);
+ if (!fs.delete(dirpath, true)) {
+ LOG.error("Unable to remove " + dirpath);
+ }
+ } catch (IOException e) {
+ LOG.error("Unable to remove " + dirpath, e);
+ }
+ }
+ } else { // Keep cleaning inside
+ cleanLogs(stat.getPath(), fs, retainMillis);
+ }
+ }
+ }
+ }
+
+ private static boolean shouldCleanAppLogDir(Path appLogPath, long now,
+ FileSystem fs, long logRetainMillis) throws IOException {
+ RemoteIterator<FileStatus> iter = fs.listStatusIterator(appLogPath);
+ while (iter.hasNext()) {
+ FileStatus stat = iter.next();
+ if (now - stat.getModificationTime() <= logRetainMillis) {
+ // found a dir entry that is fresh enough to prevent
+ // cleaning this directory.
+ LOG.debug("{} not being cleaned due to {}", appLogPath, stat.getPath());
+ return false;
+ }
+ // Otherwise, keep searching files inside for directories.
+ if (stat.isDirectory()) {
+ if (!shouldCleanAppLogDir(stat.getPath(), now, fs, logRetainMillis)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ // converts the String to an ApplicationId or null if conversion failed
+ private static ApplicationId parseApplicationId(String appIdStr) {
+ ApplicationId appId = null;
+ if (appIdStr.startsWith(ApplicationId.appIdStrPrefix)) {
+ try {
+ appId = ConverterUtils.toApplicationId(appIdStr);
+ } catch (IllegalArgumentException e) {
+ appId = null;
+ }
+ }
+ return appId;
+ }
+
+ private Path getActiveAppPath(ApplicationId appId) {
+ return new Path(activeRootPath, appId.toString());
+ }
+
+ private Path getDoneAppPath(ApplicationId appId) {
+ // cut up the app ID into mod(1000) buckets
+ int appNum = appId.getId();
+ appNum /= 1000;
+ int bucket2 = appNum % 1000;
+ int bucket1 = appNum / 1000;
+ return new Path(doneRootPath,
+ String.format(APP_DONE_DIR_PREFIX_FORMAT, appId.getClusterTimestamp(),
+ bucket1, bucket2, appId.toString()));
+ }
+
+ // This method has to be synchronized to control traffic to RM
+ private static synchronized AppState getAppState(ApplicationId appId,
+ YarnClient yarnClient) throws IOException {
+ AppState appState = AppState.ACTIVE;
+ try {
+ ApplicationReport report = yarnClient.getApplicationReport(appId);
+ YarnApplicationState yarnState = report.getYarnApplicationState();
+ if (APP_FINAL_STATES.contains(yarnState)) {
+ appState = AppState.COMPLETED;
+ }
+ } catch (ApplicationNotFoundException e) {
+ appState = AppState.UNKNOWN;
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ return appState;
+ }
+
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ enum AppState {
+ ACTIVE,
+ UNKNOWN,
+ COMPLETED
+ }
+
+ class AppLogs {
+ private ApplicationId appId;
+ private Path appDirPath;
+ private AppState appState;
+ private List<LogInfo> summaryLogs = new ArrayList<LogInfo>();
+ private List<LogInfo> detailLogs = new ArrayList<LogInfo>();
+
+ public AppLogs(ApplicationId appId, Path appPath, AppState state) {
+ this.appId = appId;
+ appDirPath = appPath;
+ appState = state;
+ }
+
+ public synchronized boolean isDone() {
+ return appState == AppState.COMPLETED;
+ }
+
+ public synchronized ApplicationId getAppId() {
+ return appId;
+ }
+
+ public synchronized Path getAppDirPath() {
+ return appDirPath;
+ }
+
+ synchronized List<LogInfo> getSummaryLogs() {
+ return summaryLogs;
+ }
+
+ synchronized List<LogInfo> getDetailLogs() {
+ return detailLogs;
+ }
+
+ public synchronized void parseSummaryLogs() throws IOException {
+ parseSummaryLogs(summaryTdm);
+ }
+
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ synchronized void parseSummaryLogs(TimelineDataManager tdm)
+ throws IOException {
+ if (!isDone()) {
+ LOG.debug("Try to parse summary log for log {} in {}",
+ appId, appDirPath);
+ appState = EntityGroupFSTimelineStore.getAppState(appId, yarnClient);
+ long recentLogModTime = scanForLogs();
+ if (appState == AppState.UNKNOWN) {
+ if (Time.now() - recentLogModTime > unknownActiveMillis) {
+ LOG.info(
+ "{} state is UNKNOWN and logs are stale, assuming COMPLETED",
+ appId);
+ appState = AppState.COMPLETED;
+ }
+ }
+ }
+ List<LogInfo> removeList = new ArrayList<LogInfo>();
+ for (LogInfo log : summaryLogs) {
+ if (fs.exists(log.getPath(appDirPath))) {
+ log.parseForStore(tdm, appDirPath, isDone(), jsonFactory,
+ objMapper, fs);
+ } else {
+ // The log may have been removed, remove the log
+ removeList.add(log);
+ LOG.info("File {} no longer exists, remove it from log list",
+ log.getPath(appDirPath));
+ }
+ }
+ summaryLogs.removeAll(removeList);
+ }
+
+ // scans for new logs and returns the modification timestamp of the
+ // most recently modified log
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ long scanForLogs() throws IOException {
+ LOG.debug("scanForLogs on {}", appDirPath);
+ long newestModTime = 0;
+ RemoteIterator<FileStatus> iterAttempt =
+ fs.listStatusIterator(appDirPath);
+ while (iterAttempt.hasNext()) {
+ FileStatus statAttempt = iterAttempt.next();
+ LOG.debug("scanForLogs on {}", statAttempt.getPath().getName());
+ if (!statAttempt.isDirectory()
+ || !statAttempt.getPath().getName()
+ .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
+ LOG.debug("Scanner skips for unknown dir/file {}",
+ statAttempt.getPath());
+ continue;
+ }
+ String attemptDirName = statAttempt.getPath().getName();
+ RemoteIterator<FileStatus> iterCache
+ = fs.listStatusIterator(statAttempt.getPath());
+ while (iterCache.hasNext()) {
+ FileStatus statCache = iterCache.next();
+ if (!statCache.isFile()) {
+ continue;
+ }
+ String filename = statCache.getPath().getName();
+ // We should only update time for log files.
+ boolean shouldSetTime = true;
+ LOG.debug("scan for log file: {}", filename);
+ if (filename.startsWith(DOMAIN_LOG_PREFIX)) {
+ addSummaryLog(attemptDirName, filename, statCache.getOwner(), true);
+ } else if (filename.startsWith(SUMMARY_LOG_PREFIX)) {
+ addSummaryLog(attemptDirName, filename, statCache.getOwner(),
+ false);
+ } else if (filename.startsWith(ENTITY_LOG_PREFIX)) {
+ addDetailLog(attemptDirName, filename, statCache.getOwner());
+ } else {
+ shouldSetTime = false;
+ }
+ if (shouldSetTime) {
+ newestModTime
+ = Math.max(statCache.getModificationTime(), newestModTime);
+ }
+ }
+ }
+
+ // if there are no logs in the directory then use the modification
+ // time of the directory itself
+ if (newestModTime == 0) {
+ newestModTime = fs.getFileStatus(appDirPath).getModificationTime();
+ }
+
+ return newestModTime;
+ }
+
+ private void addSummaryLog(String attemptDirName,
+ String filename, String owner, boolean isDomainLog) {
+ for (LogInfo log : summaryLogs) {
+ if (log.getFilename().equals(filename)
+ && log.getAttemptDirName().equals(attemptDirName)) {
+ return;
+ }
+ }
+ LOG.debug("Incoming log {} not present in my summaryLogs list, add it",
+ filename);
+ LogInfo log;
+ if (isDomainLog) {
+ log = new DomainLogInfo(attemptDirName, filename, owner);
+ } else {
+ log = new EntityLogInfo(attemptDirName, filename, owner);
+ }
+ summaryLogs.add(log);
+ }
+
+ private void addDetailLog(String attemptDirName, String filename,
+ String owner) {
+ for (LogInfo log : detailLogs) {
+ if (log.getFilename().equals(filename)
+ && log.getAttemptDirName().equals(attemptDirName)) {
+ return;
+ }
+ }
+ detailLogs.add(new EntityLogInfo(attemptDirName, filename, owner));
+ }
+
+ public synchronized void moveToDone() throws IOException {
+ Path doneAppPath = getDoneAppPath(appId);
+ if (!doneAppPath.equals(appDirPath)) {
+ Path donePathParent = doneAppPath.getParent();
+ if (!fs.exists(donePathParent)) {
+ fs.mkdirs(donePathParent);
+ }
+ LOG.debug("Application {} is done, trying to move to done dir {}",
+ appId, doneAppPath);
+ if (!fs.rename(appDirPath, doneAppPath)) {
+ throw new IOException("Rename " + appDirPath + " to " + doneAppPath
+ + " failed");
+ } else {
+ LOG.info("Moved {} to {}", appDirPath, doneAppPath);
+ }
+ appDirPath = doneAppPath;
+ }
+ }
+ }
+
+ private class EntityLogScanner implements Runnable {
+ @Override
+ public void run() {
+ LOG.debug("Active scan starting");
+ try {
+ scanActiveLogs();
+ } catch (Exception e) {
+ LOG.error("Error scanning active files", e);
+ }
+ LOG.debug("Active scan complete");
+ }
+ }
+
+ private class ActiveLogParser implements Runnable {
+ private AppLogs appLogs;
+
+ public ActiveLogParser(AppLogs logs) {
+ appLogs = logs;
+ }
+
+ @Override
+ public void run() {
+ try {
+ LOG.debug("Begin parsing summary logs. ");
+ appLogs.parseSummaryLogs();
+ if (appLogs.isDone()) {
+ appLogs.moveToDone();
+ appIdLogMap.remove(appLogs.getAppId());
+ }
+ LOG.debug("End parsing summary logs. ");
+ } catch (Exception e) {
+ LOG.error("Error processing logs for " + appLogs.getAppId(), e);
+ }
+ }
+ }
+
+ private class EntityLogCleaner implements Runnable {
+ @Override
+ public void run() {
+ LOG.debug("Cleaner starting");
+ try {
+ cleanLogs(doneRootPath, fs, logRetainMillis);
+ } catch (Exception e) {
+ LOG.error("Error cleaning files", e);
+ }
+ LOG.debug("Cleaner finished");
+ }
+ }
+
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ void setFs(FileSystem incomingFs) {
+ this.fs = incomingFs;
+ }
+
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ void setCachedLogs(TimelineEntityGroupId groupId, EntityCacheItem cacheItem) {
+ cachedLogs.put(groupId, cacheItem);
+ }
+
+ private List<TimelineStore> getTimelineStoresFromCacheIds(
+ Set<TimelineEntityGroupId> groupIds, String entityType)
+ throws IOException {
+ List<TimelineStore> stores = new LinkedList<TimelineStore>();
+ // For now we just handle one store in a context. We return the first
+ // non-null storage for the group ids.
+ for (TimelineEntityGroupId groupId : groupIds) {
+ TimelineStore storeForId = getCachedStore(groupId);
+ if (storeForId != null) {
+ LOG.debug("Adding {} as a store for the query", storeForId.getName());
+ stores.add(storeForId);
+ }
+ }
+ if (stores.size() == 0) {
+ LOG.debug("Using summary store for {}", entityType);
+ stores.add(this.summaryStore);
+ }
+ return stores;
+ }
+
+ private List<TimelineStore> getTimelineStoresForRead(String entityId,
+ String entityType) throws IOException {
+ Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
+ for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) {
+ LOG.debug("Trying plugin {} for id {} and type {}",
+ cacheIdPlugin.getClass().getName(), entityId, entityType);
+ Set<TimelineEntityGroupId> idsFromPlugin
+ = cacheIdPlugin.getTimelineEntityGroupId(entityId, entityType);
+ if (idsFromPlugin == null) {
+ LOG.debug("Plugin returned null " + cacheIdPlugin.getClass().getName());
+ } else {
+ LOG.debug("Plugin returned ids: " + idsFromPlugin);
+ }
+
+ if (idsFromPlugin != null) {
+ groupIds.addAll(idsFromPlugin);
+ LOG.debug("plugin {} returns a non-null value on query",
+ cacheIdPlugin.getClass().getName());
+ }
+ }
+ return getTimelineStoresFromCacheIds(groupIds, entityType);
+ }
+
+ private List<TimelineStore> getTimelineStoresForRead(String entityType,
+ NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters)
+ throws IOException {
+ Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
+ for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) {
+ Set<TimelineEntityGroupId> idsFromPlugin =
+ cacheIdPlugin.getTimelineEntityGroupId(entityType, primaryFilter,
+ secondaryFilters);
+ if (idsFromPlugin != null) {
+ LOG.debug("plugin {} returns a non-null value on query {}",
+ cacheIdPlugin.getClass().getName(), idsFromPlugin);
+ groupIds.addAll(idsFromPlugin);
+ }
+ }
+ return getTimelineStoresFromCacheIds(groupIds, entityType);
+ }
+
+ // find a cached timeline store or null if it cannot be located
+ private TimelineStore getCachedStore(TimelineEntityGroupId groupId)
+ throws IOException {
+ EntityCacheItem cacheItem;
+ synchronized (this.cachedLogs) {
+ // Note that the content in the cache log storage may be stale.
+ cacheItem = this.cachedLogs.get(groupId);
+ if (cacheItem == null) {
+ LOG.debug("Set up new cache item for id {}", groupId);
+ cacheItem = new EntityCacheItem(getConfig(), fs);
+ AppLogs appLogs = getAndSetAppLogs(groupId.getApplicationId());
+ if (appLogs != null) {
+ LOG.debug("Set applogs {} for group id {}", appLogs, groupId);
+ cacheItem.setAppLogs(appLogs);
+ this.cachedLogs.put(groupId, cacheItem);
+ } else {
+ LOG.warn("AppLogs for groupId {} is set to null!", groupId);
+ }
+ }
+ }
+ TimelineStore store = null;
+ if (cacheItem.getAppLogs() != null) {
+ AppLogs appLogs = cacheItem.getAppLogs();
+ LOG.debug("try refresh cache {} {}", groupId, appLogs.getAppId());
+ store = cacheItem.refreshCache(groupId, aclManager, jsonFactory,
+ objMapper);
+ } else {
+ LOG.warn("AppLogs for group id {} is null", groupId);
+ }
+ return store;
+ }
+
+ @Override
+ public TimelineEntities getEntities(String entityType, Long limit,
+ Long windowStart, Long windowEnd, String fromId, Long fromTs,
+ NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
+ EnumSet<Field> fieldsToRetrieve, CheckAcl checkAcl) throws IOException {
+ LOG.debug("getEntities type={} primary={}", entityType, primaryFilter);
+ List<TimelineStore> stores = getTimelineStoresForRead(entityType,
+ primaryFilter, secondaryFilters);
+ TimelineEntities returnEntities = new TimelineEntities();
+ for (TimelineStore store : stores) {
+ LOG.debug("Try timeline store {} for the request", store.getName());
+ returnEntities.addEntities(
+ store.getEntities(entityType, limit, windowStart, windowEnd, fromId,
+ fromTs, primaryFilter, secondaryFilters, fieldsToRetrieve,
+ checkAcl).getEntities());
+ }
+ return returnEntities;
+ }
+
+ @Override
+ public TimelineEntity getEntity(String entityId, String entityType,
+ EnumSet<Field> fieldsToRetrieve) throws IOException {
+ LOG.debug("getEntity type={} id={}", entityType, entityId);
+ List<TimelineStore> stores = getTimelineStoresForRead(entityId, entityType);
+ for (TimelineStore store : stores) {
+ LOG.debug("Try timeline store {}:{} for the request", store.getName(),
+ store.toString());
+ TimelineEntity e =
+ store.getEntity(entityId, entityType, fieldsToRetrieve);
+ if (e != null) {
+ return e;
+ }
+ }
+ LOG.debug("getEntity: Found nothing");
+ return null;
+ }
+
+ @Override
+ public TimelineEvents getEntityTimelines(String entityType,
+ SortedSet<String> entityIds, Long limit, Long windowStart,
+ Long windowEnd, Set<String> eventTypes) throws IOException {
+ LOG.debug("getEntityTimelines type={} ids={}", entityType, entityIds);
+ TimelineEvents returnEvents = new TimelineEvents();
+ for (String entityId : entityIds) {
+ LOG.debug("getEntityTimeline type={} id={}", entityType, entityId);
+ List<TimelineStore> stores
+ = getTimelineStoresForRead(entityId, entityType);
+ for (TimelineStore store : stores) {
+ LOG.debug("Try timeline store {}:{} for the request", store.getName(),
+ store.toString());
+ SortedSet<String> entityIdSet = new TreeSet<>();
+ entityIdSet.add(entityId);
+ TimelineEvents events =
+ store.getEntityTimelines(entityType, entityIdSet, limit,
+ windowStart, windowEnd, eventTypes);
+ returnEvents.addEvents(events.getAllEvents());
+ }
+ }
+ return returnEvents;
+ }
+
+ @Override
+ public TimelineDomain getDomain(String domainId) throws IOException {
+ return summaryStore.getDomain(domainId);
+ }
+
+ @Override
+ public TimelineDomains getDomains(String owner) throws IOException {
+ return summaryStore.getDomains(owner);
+ }
+
+ @Override
+ public TimelinePutResponse put(TimelineEntities data) throws IOException {
+ return summaryStore.put(data);
+ }
+
+ @Override
+ public void put(TimelineDomain domain) throws IOException {
+ summaryStore.put(domain);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java
new file mode 100644
index 0000000..4caed8d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.timeline;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.MappingIterator;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+abstract class LogInfo {
+ public static final String ENTITY_FILE_NAME_DELIMITERS = "_.";
+
+ public String getAttemptDirName() {
+ return attemptDirName;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public void setOffset(long newOffset) {
+ this.offset = newOffset;
+ }
+
+ private String attemptDirName;
+ private String filename;
+ private String user;
+ private long offset = 0;
+
+ private static final Logger LOG = LoggerFactory.getLogger(LogInfo.class);
+
+ public LogInfo(String attemptDirName, String file, String owner) {
+ this.attemptDirName = attemptDirName;
+ filename = file;
+ user = owner;
+ }
+
+ public Path getPath(Path rootPath) {
+ Path attemptPath = new Path(rootPath, attemptDirName);
+ return new Path(attemptPath, filename);
+ }
+
+ public String getFilename() {
+ return filename;
+ }
+
+ public boolean matchesGroupId(TimelineEntityGroupId groupId) {
+ return matchesGroupId(groupId.toString());
+ }
+
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ boolean matchesGroupId(String groupId){
+ // Return true if the group id is a segment (separated by _, ., or end of
+ // string) of the file name.
+ int pos = filename.indexOf(groupId);
+ if (pos < 0) {
+ return false;
+ }
+ return filename.length() == pos + groupId.length()
+ || ENTITY_FILE_NAME_DELIMITERS.contains(String.valueOf(
+ filename.charAt(pos + groupId.length())
+ ));
+ }
+
+ public void parseForStore(TimelineDataManager tdm, Path appDirPath,
+ boolean appCompleted, JsonFactory jsonFactory, ObjectMapper objMapper,
+ FileSystem fs) throws IOException {
+ LOG.debug("Parsing for log dir {} on attempt {}", appDirPath,
+ attemptDirName);
+ Path logPath = getPath(appDirPath);
+ if (fs.exists(logPath)) {
+ long startTime = Time.monotonicNow();
+ try {
+ LOG.debug("Parsing {} at offset {}", logPath, offset);
+ long count = parsePath(tdm, logPath, appCompleted, jsonFactory,
+ objMapper, fs);
+ LOG.info("Parsed {} entities from {} in {} msec",
+ count, logPath, Time.monotonicNow() - startTime);
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof JsonParseException) {
+ // If AppLogs cannot parse this log, it may be corrupted
+ LOG.info("Log {} appears to be corrupted. Skip. ", logPath);
+ }
+ }
+ } else {
+ LOG.warn("{} no longer exists. Skip for scanning. ", logPath);
+ }
+ }
+
+ private long parsePath(TimelineDataManager tdm, Path logPath,
+ boolean appCompleted, JsonFactory jsonFactory, ObjectMapper objMapper,
+ FileSystem fs) throws IOException {
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(user);
+ FSDataInputStream in = fs.open(logPath);
+ JsonParser parser = null;
+ try {
+ in.seek(offset);
+ try {
+ parser = jsonFactory.createJsonParser(in);
+ parser.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
+ } catch (IOException e) {
+ // if app hasn't completed then there may be errors due to the
+ // incomplete file which are treated as EOF until app completes
+ if (appCompleted) {
+ throw e;
+ } else {
+ LOG.debug("Exception in parse path: {}", e.getMessage());
+ return 0;
+ }
+ }
+
+ return doParse(tdm, parser, objMapper, ugi, appCompleted);
+ } finally {
+ IOUtils.closeStream(parser);
+ IOUtils.closeStream(in);
+ }
+ }
+
+ protected abstract long doParse(TimelineDataManager tdm, JsonParser parser,
+ ObjectMapper objMapper, UserGroupInformation ugi, boolean appCompleted)
+ throws IOException;
+}
+
+class EntityLogInfo extends LogInfo {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ EntityGroupFSTimelineStore.class);
+
+ public EntityLogInfo(String attemptId,
+ String file, String owner) {
+ super(attemptId, file, owner);
+ }
+
+ @Override
+ protected long doParse(TimelineDataManager tdm, JsonParser parser,
+ ObjectMapper objMapper, UserGroupInformation ugi, boolean appCompleted)
+ throws IOException {
+ long count = 0;
+ TimelineEntities entities = new TimelineEntities();
+ ArrayList<TimelineEntity> entityList = new ArrayList<TimelineEntity>(1);
+ long bytesParsed;
+ long bytesParsedLastBatch = 0;
+ boolean postError = false;
+ try {
+ MappingIterator<TimelineEntity> iter = objMapper.readValues(parser,
+ TimelineEntity.class);
+
+ while (iter.hasNext()) {
+ TimelineEntity entity = iter.next();
+ String etype = entity.getEntityType();
+ String eid = entity.getEntityId();
+ LOG.trace("Read entity {}", etype);
+ ++count;
+ bytesParsed = parser.getCurrentLocation().getCharOffset() + 1;
+ LOG.trace("Parser now at offset {}", bytesParsed);
+
+ try {
+ LOG.debug("Adding {}({}) to store", eid, etype);
+ entityList.add(entity);
+ entities.setEntities(entityList);
+ TimelinePutResponse response = tdm.postEntities(entities, ugi);
+ for (TimelinePutResponse.TimelinePutError e
+ : response.getErrors()) {
+ LOG.warn("Error putting entity: {} ({}): {}",
+ e.getEntityId(), e.getEntityType(), e.getErrorCode());
+ }
+ setOffset(getOffset() + bytesParsed - bytesParsedLastBatch);
+ bytesParsedLastBatch = bytesParsed;
+ entityList.clear();
+ } catch (YarnException e) {
+ postError = true;
+ throw new IOException("Error posting entities", e);
+ } catch (IOException e) {
+ postError = true;
+ throw new IOException("Error posting entities", e);
+ }
+ }
+ } catch (IOException e) {
+ // if app hasn't completed then there may be errors due to the
+ // incomplete file which are treated as EOF until app completes
+ if (appCompleted || postError) {
+ throw e;
+ }
+ } catch (RuntimeException e) {
+ if (appCompleted || !(e.getCause() instanceof JsonParseException)) {
+ throw e;
+ }
+ }
+ return count;
+ }
+}
+
+class DomainLogInfo extends LogInfo {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ EntityGroupFSTimelineStore.class);
+
+ public DomainLogInfo(String attemptDirName, String file,
+ String owner) {
+ super(attemptDirName, file, owner);
+ }
+
+ protected long doParse(TimelineDataManager tdm, JsonParser parser,
+ ObjectMapper objMapper, UserGroupInformation ugi, boolean appCompleted)
+ throws IOException {
+ long count = 0;
+ long bytesParsed;
+ long bytesParsedLastBatch = 0;
+ boolean putError = false;
+ try {
+ MappingIterator<TimelineDomain> iter = objMapper.readValues(parser,
+ TimelineDomain.class);
+
+ while (iter.hasNext()) {
+ TimelineDomain domain = iter.next();
+ domain.setOwner(ugi.getShortUserName());
+ LOG.trace("Read domain {}", domain.getId());
+ ++count;
+ bytesParsed = parser.getCurrentLocation().getCharOffset() + 1;
+ LOG.trace("Parser now at offset {}", bytesParsed);
+
+ try {
+ tdm.putDomain(domain, ugi);
+ setOffset(getOffset() + bytesParsed - bytesParsedLastBatch);
+ bytesParsedLastBatch = bytesParsed;
+ } catch (YarnException e) {
+ putError = true;
+ throw new IOException("Error posting domain", e);
+ } catch (IOException e) {
+ putError = true;
+ throw new IOException("Error posting domain", e);
+ }
+ }
+ } catch (IOException e) {
+ // if app hasn't completed then there may be errors due to the
+ // incomplete file which are treated as EOF until app completes
+ if (appCompleted || putError) {
+ throw e;
+ }
+ } catch (RuntimeException e) {
+ if (appCompleted || !(e.getCause() instanceof JsonParseException)) {
+ throw e;
+ }
+ }
+ return count;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineEntityGroupPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineEntityGroupPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineEntityGroupPlugin.java
new file mode 100644
index 0000000..9cdbf5f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineEntityGroupPlugin.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.SortedSet;
+
+/**
+ * Plugin to map a requested query ( or an Entity/set of Entities ) to a CacheID.
+ * The Cache ID is an identifier to the data set that needs to be queried to
+ * serve the response for the query.
+ */
+public abstract class TimelineEntityGroupPlugin {
+
+ /**
+ * Get the {@link TimelineEntityGroupId}s for the data sets that need to be
+ * scanned to serve the query.
+ *
+ * @param entityType Entity Type being queried
+ * @param primaryFilter Primary filter being applied
+ * @param secondaryFilters Secondary filters being applied in the query
+ * @return {@link org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId}
+ */
+ public abstract Set<TimelineEntityGroupId> getTimelineEntityGroupId(
+ String entityType, NameValuePair primaryFilter,
+ Collection<NameValuePair> secondaryFilters);
+
+ /**
+ * Get the {@link TimelineEntityGroupId}s for the data sets that need to be
+ * scanned to serve the query.
+ *
+ * @param entityType Entity Type being queried
+ * @param entityId Entity Id being requested
+ * @return {@link org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId}
+ */
+ public abstract Set<TimelineEntityGroupId> getTimelineEntityGroupId(
+ String entityId,
+ String entityType);
+
+
+ /**
+ * Get the {@link TimelineEntityGroupId}s for the data sets that need to be
+ * scanned to serve the query.
+ *
+ * @param entityType Entity Type being queried
+ * @param entityIds Entity Ids being requested
+ * @param eventTypes Event Types being requested
+ * @return {@link org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId}
+ */
+ public abstract Set<TimelineEntityGroupId> getTimelineEntityGroupId(
+ String entityType, SortedSet<String> entityIds,
+ Set<String> eventTypes);
+
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/package-info.java
new file mode 100644
index 0000000..6f61bba
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file