You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/12/02 18:28:34 UTC
[18/30] ambari git commit: AMBARI-5707. Replace Ganglia with high
performant and pluggable Metrics System. (swagle)
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java
new file mode 100644
index 0000000..e15198b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+
+public class ApplicationHistoryClientService extends AbstractService {
+ private static final Log LOG = LogFactory
+ .getLog(ApplicationHistoryClientService.class);
+ private ApplicationHistoryManager history;
+ private ApplicationHistoryProtocol protocolHandler;
+ private Server server;
+ private InetSocketAddress bindAddress;
+
+ public ApplicationHistoryClientService(ApplicationHistoryManager history) {
+ super("ApplicationHistoryClientService");
+ this.history = history;
+ this.protocolHandler = new ApplicationHSClientProtocolHandler();
+ }
+
+ protected void serviceStart() throws Exception {
+ Configuration conf = getConfig();
+ YarnRPC rpc = YarnRPC.create(conf);
+ InetSocketAddress address =
+ conf.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT);
+
+ server =
+ rpc.getServer(ApplicationHistoryProtocol.class, protocolHandler,
+ address, conf, null, conf.getInt(
+ YarnConfiguration.TIMELINE_SERVICE_HANDLER_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_THREAD_COUNT));
+
+ server.start();
+ this.bindAddress =
+ conf.updateConnectAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
+ server.getListenerAddress());
+ LOG.info("Instantiated ApplicationHistoryClientService at "
+ + this.bindAddress);
+
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (server != null) {
+ server.stop();
+ }
+ super.serviceStop();
+ }
+
+ @Private
+ public ApplicationHistoryProtocol getClientHandler() {
+ return this.protocolHandler;
+ }
+
+ @Private
+ public InetSocketAddress getBindAddress() {
+ return this.bindAddress;
+ }
+
+ private class ApplicationHSClientProtocolHandler implements
+ ApplicationHistoryProtocol {
+
+ @Override
+ public CancelDelegationTokenResponse cancelDelegationToken(
+ CancelDelegationTokenRequest request) throws YarnException, IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+ GetApplicationAttemptReportRequest request) throws YarnException,
+ IOException {
+ try {
+ GetApplicationAttemptReportResponse response =
+ GetApplicationAttemptReportResponse.newInstance(history
+ .getApplicationAttempt(request.getApplicationAttemptId()));
+ return response;
+ } catch (IOException e) {
+ throw new ApplicationAttemptNotFoundException(e.getMessage());
+ }
+ }
+
+ @Override
+ public GetApplicationAttemptsResponse getApplicationAttempts(
+ GetApplicationAttemptsRequest request) throws YarnException,
+ IOException {
+ GetApplicationAttemptsResponse response =
+ GetApplicationAttemptsResponse
+ .newInstance(new ArrayList<ApplicationAttemptReport>(history
+ .getApplicationAttempts(request.getApplicationId()).values()));
+ return response;
+ }
+
+ @Override
+ public GetApplicationReportResponse getApplicationReport(
+ GetApplicationReportRequest request) throws YarnException, IOException {
+ try {
+ ApplicationId applicationId = request.getApplicationId();
+ GetApplicationReportResponse response =
+ GetApplicationReportResponse.newInstance(history
+ .getApplication(applicationId));
+ return response;
+ } catch (IOException e) {
+ throw new ApplicationNotFoundException(e.getMessage());
+ }
+ }
+
+ @Override
+ public GetApplicationsResponse getApplications(
+ GetApplicationsRequest request) throws YarnException, IOException {
+ GetApplicationsResponse response =
+ GetApplicationsResponse.newInstance(new ArrayList<ApplicationReport>(
+ history.getAllApplications().values()));
+ return response;
+ }
+
+ @Override
+ public GetContainerReportResponse getContainerReport(
+ GetContainerReportRequest request) throws YarnException, IOException {
+ try {
+ GetContainerReportResponse response =
+ GetContainerReportResponse.newInstance(history.getContainer(request
+ .getContainerId()));
+ return response;
+ } catch (IOException e) {
+ throw new ContainerNotFoundException(e.getMessage());
+ }
+ }
+
+ @Override
+ public GetContainersResponse getContainers(GetContainersRequest request)
+ throws YarnException, IOException {
+ GetContainersResponse response =
+ GetContainersResponse.newInstance(new ArrayList<ContainerReport>(
+ history.getContainers(request.getApplicationAttemptId()).values()));
+ return response;
+ }
+
+ @Override
+ public GetDelegationTokenResponse getDelegationToken(
+ GetDelegationTokenRequest request) throws YarnException, IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public RenewDelegationTokenResponse renewDelegationToken(
+ RenewDelegationTokenRequest request) throws YarnException, IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManager.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManager.java
new file mode 100644
index 0000000..db25d29
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManager.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.server.api.ApplicationContext;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface ApplicationHistoryManager extends ApplicationContext {
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerImpl.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerImpl.java
new file mode 100644
index 0000000..85a5e3a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerImpl.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.MemoryTimelineStore;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DISABLE_APPLICATION_TIMELINE_STORE;
+
+public class ApplicationHistoryManagerImpl extends AbstractService implements
+ ApplicationHistoryManager {
+ private static final Log LOG = LogFactory
+ .getLog(ApplicationHistoryManagerImpl.class);
+ private static final String UNAVAILABLE = "N/A";
+
+ private ApplicationHistoryStore historyStore;
+ private String serverHttpAddress;
+
+ public ApplicationHistoryManagerImpl() {
+ super(ApplicationHistoryManagerImpl.class.getName());
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ LOG.info("ApplicationHistory Init");
+ historyStore = createApplicationHistoryStore(conf);
+ historyStore.init(conf);
+ serverHttpAddress = WebAppUtils.getHttpSchemePrefix(conf) +
+ WebAppUtils.getAHSWebAppURLWithoutScheme(conf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ LOG.info("Starting ApplicationHistory");
+ historyStore.start();
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ LOG.info("Stopping ApplicationHistory");
+ historyStore.stop();
+ super.serviceStop();
+ }
+
+ protected ApplicationHistoryStore createApplicationHistoryStore(
+ Configuration conf) {
+ if (conf.getBoolean(DISABLE_APPLICATION_TIMELINE_STORE, true)) {
+ LOG.info("Explicitly disabled application timeline store.");
+ return new NullApplicationHistoryStore();
+ }
+ return ReflectionUtils.newInstance(conf.getClass(
+ YarnConfiguration.APPLICATION_HISTORY_STORE,
+ NullApplicationHistoryStore.class,
+ ApplicationHistoryStore.class), conf);
+ }
+
+ @Override
+ public ContainerReport getAMContainer(ApplicationAttemptId appAttemptId)
+ throws IOException {
+ ApplicationReport app =
+ getApplication(appAttemptId.getApplicationId());
+ return convertToContainerReport(historyStore.getAMContainer(appAttemptId),
+ app == null ? null : app.getUser());
+ }
+
+ @Override
+ public Map<ApplicationId, ApplicationReport> getAllApplications()
+ throws IOException {
+ Map<ApplicationId, ApplicationHistoryData> histData =
+ historyStore.getAllApplications();
+ HashMap<ApplicationId, ApplicationReport> applicationsReport =
+ new HashMap<ApplicationId, ApplicationReport>();
+ for (Entry<ApplicationId, ApplicationHistoryData> entry : histData
+ .entrySet()) {
+ applicationsReport.put(entry.getKey(),
+ convertToApplicationReport(entry.getValue()));
+ }
+ return applicationsReport;
+ }
+
+ @Override
+ public ApplicationReport getApplication(ApplicationId appId)
+ throws IOException {
+ return convertToApplicationReport(historyStore.getApplication(appId));
+ }
+
+ private ApplicationReport convertToApplicationReport(
+ ApplicationHistoryData appHistory) throws IOException {
+ ApplicationAttemptId currentApplicationAttemptId = null;
+ String trackingUrl = UNAVAILABLE;
+ String host = UNAVAILABLE;
+ int rpcPort = -1;
+
+ ApplicationAttemptHistoryData lastAttempt =
+ getLastAttempt(appHistory.getApplicationId());
+ if (lastAttempt != null) {
+ currentApplicationAttemptId = lastAttempt.getApplicationAttemptId();
+ trackingUrl = lastAttempt.getTrackingURL();
+ host = lastAttempt.getHost();
+ rpcPort = lastAttempt.getRPCPort();
+ }
+ return ApplicationReport.newInstance(appHistory.getApplicationId(),
+ currentApplicationAttemptId, appHistory.getUser(), appHistory.getQueue(),
+ appHistory.getApplicationName(), host, rpcPort, null,
+ appHistory.getYarnApplicationState(), appHistory.getDiagnosticsInfo(),
+ trackingUrl, appHistory.getStartTime(), appHistory.getFinishTime(),
+ appHistory.getFinalApplicationStatus(), null, "", 100,
+ appHistory.getApplicationType(), null);
+ }
+
+ private ApplicationAttemptHistoryData getLastAttempt(ApplicationId appId)
+ throws IOException {
+ Map<ApplicationAttemptId, ApplicationAttemptHistoryData> attempts =
+ historyStore.getApplicationAttempts(appId);
+ ApplicationAttemptId prevMaxAttemptId = null;
+ for (ApplicationAttemptId attemptId : attempts.keySet()) {
+ if (prevMaxAttemptId == null) {
+ prevMaxAttemptId = attemptId;
+ } else {
+ if (prevMaxAttemptId.getAttemptId() < attemptId.getAttemptId()) {
+ prevMaxAttemptId = attemptId;
+ }
+ }
+ }
+ return attempts.get(prevMaxAttemptId);
+ }
+
+ private ApplicationAttemptReport convertToApplicationAttemptReport(
+ ApplicationAttemptHistoryData appAttemptHistory) {
+ return ApplicationAttemptReport.newInstance(
+ appAttemptHistory.getApplicationAttemptId(), appAttemptHistory.getHost(),
+ appAttemptHistory.getRPCPort(), appAttemptHistory.getTrackingURL(),
+ appAttemptHistory.getDiagnosticsInfo(),
+ appAttemptHistory.getYarnApplicationAttemptState(),
+ appAttemptHistory.getMasterContainerId());
+ }
+
+ @Override
+ public ApplicationAttemptReport getApplicationAttempt(
+ ApplicationAttemptId appAttemptId) throws IOException {
+ return convertToApplicationAttemptReport(historyStore
+ .getApplicationAttempt(appAttemptId));
+ }
+
+ @Override
+ public Map<ApplicationAttemptId, ApplicationAttemptReport>
+ getApplicationAttempts(ApplicationId appId) throws IOException {
+ Map<ApplicationAttemptId, ApplicationAttemptHistoryData> histData =
+ historyStore.getApplicationAttempts(appId);
+ HashMap<ApplicationAttemptId, ApplicationAttemptReport> applicationAttemptsReport =
+ new HashMap<ApplicationAttemptId, ApplicationAttemptReport>();
+ for (Entry<ApplicationAttemptId, ApplicationAttemptHistoryData> entry : histData
+ .entrySet()) {
+ applicationAttemptsReport.put(entry.getKey(),
+ convertToApplicationAttemptReport(entry.getValue()));
+ }
+ return applicationAttemptsReport;
+ }
+
+ @Override
+ public ContainerReport getContainer(ContainerId containerId)
+ throws IOException {
+ ApplicationReport app =
+ getApplication(containerId.getApplicationAttemptId().getApplicationId());
+ return convertToContainerReport(historyStore.getContainer(containerId),
+ app == null ? null: app.getUser());
+ }
+
+ private ContainerReport convertToContainerReport(
+ ContainerHistoryData containerHistory, String user) {
+ // If the container has the aggregated log, add the server root url
+ String logUrl = WebAppUtils.getAggregatedLogURL(
+ serverHttpAddress,
+ containerHistory.getAssignedNode().toString(),
+ containerHistory.getContainerId().toString(),
+ containerHistory.getContainerId().toString(),
+ user);
+ return ContainerReport.newInstance(containerHistory.getContainerId(),
+ containerHistory.getAllocatedResource(),
+ containerHistory.getAssignedNode(), containerHistory.getPriority(),
+ containerHistory.getStartTime(), containerHistory.getFinishTime(),
+ containerHistory.getDiagnosticsInfo(), logUrl,
+ containerHistory.getContainerExitStatus(),
+ containerHistory.getContainerState());
+ }
+
+ @Override
+ public Map<ContainerId, ContainerReport> getContainers(
+ ApplicationAttemptId appAttemptId) throws IOException {
+ ApplicationReport app =
+ getApplication(appAttemptId.getApplicationId());
+ Map<ContainerId, ContainerHistoryData> histData =
+ historyStore.getContainers(appAttemptId);
+ HashMap<ContainerId, ContainerReport> containersReport =
+ new HashMap<ContainerId, ContainerReport>();
+ for (Entry<ContainerId, ContainerHistoryData> entry : histData.entrySet()) {
+ containersReport.put(entry.getKey(),
+ convertToContainerReport(entry.getValue(),
+ app == null ? null : app.getUser()));
+ }
+ return containersReport;
+ }
+
+ @Private
+ @VisibleForTesting
+ public ApplicationHistoryStore getHistoryStore() {
+ return this.historyStore;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryReader.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryReader.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryReader.java
new file mode 100644
index 0000000..590853a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryReader.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface ApplicationHistoryReader {
+
+ /**
+ * This method returns Application {@link ApplicationHistoryData} for the
+ * specified {@link ApplicationId}.
+ *
+ * @param appId
+ *
+ * @return {@link ApplicationHistoryData} for the ApplicationId.
+ * @throws IOException
+ */
+ ApplicationHistoryData getApplication(ApplicationId appId) throws IOException;
+
+ /**
+ * This method returns all Application {@link ApplicationHistoryData}s
+ *
+ * @return map of {@link ApplicationId} to {@link ApplicationHistoryData}s.
+ * @throws IOException
+ */
+ Map<ApplicationId, ApplicationHistoryData> getAllApplications()
+ throws IOException;
+
+ /**
+ * Application can have multiple application attempts
+ * {@link ApplicationAttemptHistoryData}. This method returns the all
+ * {@link ApplicationAttemptHistoryData}s for the Application.
+ *
+ * @param appId
+ *
+ * @return all {@link ApplicationAttemptHistoryData}s for the Application.
+ * @throws IOException
+ */
+ Map<ApplicationAttemptId, ApplicationAttemptHistoryData>
+ getApplicationAttempts(ApplicationId appId) throws IOException;
+
+ /**
+ * This method returns {@link ApplicationAttemptHistoryData} for specified
+ * {@link ApplicationId}.
+ *
+ * @param appAttemptId
+ * {@link ApplicationAttemptId}
+ * @return {@link ApplicationAttemptHistoryData} for ApplicationAttemptId
+ * @throws IOException
+ */
+ ApplicationAttemptHistoryData getApplicationAttempt(
+ ApplicationAttemptId appAttemptId) throws IOException;
+
+ /**
+ * This method returns {@link ContainerHistoryData} for specified
+ * {@link ContainerId}.
+ *
+ * @param containerId
+ * {@link ContainerId}
+ * @return {@link ContainerHistoryData} for ContainerId
+ * @throws IOException
+ */
+ ContainerHistoryData getContainer(ContainerId containerId) throws IOException;
+
+ /**
+ * This method returns {@link ContainerHistoryData} for specified
+ * {@link ApplicationAttemptId}.
+ *
+ * @param appAttemptId
+ * {@link ApplicationAttemptId}
+ * @return {@link ContainerHistoryData} for ApplicationAttemptId
+ * @throws IOException
+ */
+ ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId)
+ throws IOException;
+
+ /**
+ * This method returns Map{@link ContainerId} to {@link ContainerHistoryData}
+ * for specified {@link ApplicationAttemptId}.
+ *
+ * @param appAttemptId
+ * {@link ApplicationAttemptId}
+ * @return Map{@link ContainerId} to {@link ContainerHistoryData} for
+ * ApplicationAttemptId
+ * @throws IOException
+ */
+ Map<ContainerId, ContainerHistoryData> getContainers(
+ ApplicationAttemptId appAttemptId) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
new file mode 100644
index 0000000..3adb3b8
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.HBaseTimelineMetricStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.MemoryTimelineStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.LeveldbTimelineStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
+import org.apache.hadoop.yarn.webapp.WebApp;
+import org.apache.hadoop.yarn.webapp.WebApps;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.*;
+
+/**
+ * History server that keeps track of all types of history in the cluster.
+ * Application specific history to start with.
+ */
+public class ApplicationHistoryServer extends CompositeService {
+
+ public static final int SHUTDOWN_HOOK_PRIORITY = 30;
+ private static final Log LOG = LogFactory
+ .getLog(ApplicationHistoryServer.class);
+
+ ApplicationHistoryClientService ahsClientService;
+ ApplicationHistoryManager historyManager;
+ TimelineStore timelineStore;
+ TimelineMetricStore timelineMetricStore;
+ private WebApp webApp;
+
+ public ApplicationHistoryServer() {
+ super(ApplicationHistoryServer.class.getName());
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ historyManager = createApplicationHistory();
+ ahsClientService = createApplicationHistoryClientService(historyManager);
+ addService(ahsClientService);
+ addService((Service) historyManager);
+ timelineStore = createTimelineStore(conf);
+ timelineMetricStore = createTimelineMetricStore(conf);
+ addIfService(timelineStore);
+ addIfService(timelineMetricStore);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ DefaultMetricsSystem.initialize("ApplicationHistoryServer");
+ JvmMetrics.initSingleton("ApplicationHistoryServer", null);
+
+ startWebApp();
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (webApp != null) {
+ webApp.stop();
+ }
+
+ DefaultMetricsSystem.shutdown();
+ super.serviceStop();
+ }
+
+ @Private
+ @VisibleForTesting
+ public ApplicationHistoryClientService getClientService() {
+ return this.ahsClientService;
+ }
+
+ protected ApplicationHistoryClientService
+ createApplicationHistoryClientService(
+ ApplicationHistoryManager historyManager) {
+ return new ApplicationHistoryClientService(historyManager);
+ }
+
+ protected ApplicationHistoryManager createApplicationHistory() {
+ return new ApplicationHistoryManagerImpl();
+ }
+
+ protected ApplicationHistoryManager getApplicationHistory() {
+ return this.historyManager;
+ }
+
+ static ApplicationHistoryServer launchAppHistoryServer(String[] args) {
+ Thread
+ .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+ StringUtils.startupShutdownMessage(ApplicationHistoryServer.class, args,
+ LOG);
+ ApplicationHistoryServer appHistoryServer = null;
+ try {
+ appHistoryServer = new ApplicationHistoryServer();
+ ShutdownHookManager.get().addShutdownHook(
+ new CompositeServiceShutdownHook(appHistoryServer),
+ SHUTDOWN_HOOK_PRIORITY);
+ YarnConfiguration conf = new YarnConfiguration();
+ appHistoryServer.init(conf);
+ appHistoryServer.start();
+ } catch (Throwable t) {
+ LOG.fatal("Error starting ApplicationHistoryServer", t);
+ ExitUtil.terminate(-1, "Error starting ApplicationHistoryServer");
+ }
+ return appHistoryServer;
+ }
+
+ public static void main(String[] args) {
+ launchAppHistoryServer(args);
+ }
+
+ protected ApplicationHistoryManager createApplicationHistoryManager(
+ Configuration conf) {
+ return new ApplicationHistoryManagerImpl();
+ }
+
+ protected TimelineStore createTimelineStore(Configuration conf) {
+ if (conf.getBoolean(DISABLE_APPLICATION_TIMELINE_STORE, true)) {
+ LOG.info("Explicitly disabled application timeline store.");
+ return new MemoryTimelineStore();
+ }
+ return ReflectionUtils.newInstance(conf.getClass(
+ YarnConfiguration.TIMELINE_SERVICE_STORE, LeveldbTimelineStore.class,
+ TimelineStore.class), conf);
+ }
+
+ protected TimelineMetricStore createTimelineMetricStore(Configuration conf) {
+ LOG.info("Creating metrics store.");
+ return ReflectionUtils.newInstance(HBaseTimelineMetricStore.class, conf);
+ }
+
+ protected void startWebApp() {
+ String bindAddress = WebAppUtils.getAHSWebAppURLWithoutScheme(getConfig());
+ LOG.info("Instantiating AHSWebApp at " + bindAddress);
+ try {
+ webApp =
+ WebApps
+ .$for("applicationhistory", ApplicationHistoryClientService.class,
+ ahsClientService, "ws")
+ .with(getConfig())
+ .withHttpSpnegoPrincipalKey(
+ YarnConfiguration.TIMELINE_SERVICE_WEBAPP_SPNEGO_USER_NAME_KEY)
+ .withHttpSpnegoKeytabKey(
+ YarnConfiguration.TIMELINE_SERVICE_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
+ .at(bindAddress)
+ .start(new AHSWebApp(historyManager, timelineStore, timelineMetricStore));
+ } catch (Exception e) {
+ String msg = "AHSWebApp failed to start.";
+ LOG.error(msg, e);
+ throw new YarnRuntimeException(msg, e);
+ }
+ }
+ /**
+ * @return ApplicationTimelineStore
+ */
+ @Private
+ @VisibleForTesting
+ public TimelineStore getTimelineStore() {
+ return timelineStore;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryStore.java
new file mode 100644
index 0000000..c26faef
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryStore.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.Service;
+
+/**
+ * This class is the abstract of the storage of the application history data. It
+ * is a {@link Service}, such that the implementation of this class can make use
+ * of the service life cycle to initialize and cleanup the storage. Users can
+ * access the storage via {@link ApplicationHistoryReader} and
+ * {@link ApplicationHistoryWriter} interfaces.
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface ApplicationHistoryStore extends Service,
+ ApplicationHistoryReader, ApplicationHistoryWriter {
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java
new file mode 100644
index 0000000..09ba36d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
+
+/**
+ * It is the interface of writing the application history, exposing the methods
+ * of writing {@link ApplicationStartData}, {@link ApplicationFinishData}
+ * {@link ApplicationAttemptStartData}, {@link ApplicationAttemptFinishData},
+ * {@link ContainerStartData} and {@link ContainerFinishData}.
+ */
+@Private
+@Unstable
+public interface ApplicationHistoryWriter {
+
+ /**
+ * This method writes the information of <code>RMApp</code> that is available
+ * when it starts.
+ *
+ * @param appStart
+ * the record of the information of <code>RMApp</code> that is
+ * available when it starts
+ * @throws IOException
+ */
+ void applicationStarted(ApplicationStartData appStart) throws IOException;
+
+ /**
+ * This method writes the information of <code>RMApp</code> that is available
+ * when it finishes.
+ *
+ * @param appFinish
+ * the record of the information of <code>RMApp</code> that is
+ * available when it finishes
+ * @throws IOException
+ */
+ void applicationFinished(ApplicationFinishData appFinish) throws IOException;
+
+ /**
+ * This method writes the information of <code>RMAppAttempt</code> that is
+ * available when it starts.
+ *
+ * @param appAttemptStart
+ * the record of the information of <code>RMAppAttempt</code> that is
+ * available when it starts
+ * @throws IOException
+ */
+ void applicationAttemptStarted(ApplicationAttemptStartData appAttemptStart)
+ throws IOException;
+
+ /**
+ * This method writes the information of <code>RMAppAttempt</code> that is
+ * available when it finishes.
+ *
+ * @param appAttemptFinish
+ * the record of the information of <code>RMAppAttempt</code> that is
+ * available when it finishes
+ * @throws IOException
+ */
+ void
+ applicationAttemptFinished(ApplicationAttemptFinishData appAttemptFinish)
+ throws IOException;
+
+ /**
+ * This method writes the information of <code>RMContainer</code> that is
+ * available when it starts.
+ *
+ * @param containerStart
+ * the record of the information of <code>RMContainer</code> that is
+ * available when it starts
+ * @throws IOException
+ */
+ void containerStarted(ContainerStartData containerStart) throws IOException;
+
+ /**
+ * This method writes the information of <code>RMContainer</code> that is
+ * available when it finishes.
+ *
+ * @param containerFinish
+ * the record of the information of <code>RMContainer</code> that is
+ * available when it finishes
+ * @throws IOException
+ */
+ void containerFinished(ContainerFinishData containerFinish)
+ throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java
new file mode 100644
index 0000000..4c8d745
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java
@@ -0,0 +1,784 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.file.tfile.TFile;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptFinishDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptStartDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationFinishDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationStartDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerFinishDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerStartDataProto;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptFinishDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptStartDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationFinishDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationStartDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerFinishDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerStartDataPBImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * File system implementation of {@link ApplicationHistoryStore}. In this
+ * implementation, one application will have just one file in the file system,
+ * which contains all the history data of one application, and its attempts and
+ * containers. {@link #applicationStarted(ApplicationStartData)} is supposed to
+ * be invoked first when writing any history data of one application and it will
+ * open a file, while {@link #applicationFinished(ApplicationFinishData)} is
+ * supposed to be last writing operation and will close the file.
+ */
+@Public
+@Unstable
+public class FileSystemApplicationHistoryStore extends AbstractService
+ implements ApplicationHistoryStore {
+
+ private static final Log LOG = LogFactory
+ .getLog(FileSystemApplicationHistoryStore.class);
+
+ private static final String ROOT_DIR_NAME = "ApplicationHistoryDataRoot";
+ private static final int MIN_BLOCK_SIZE = 256 * 1024;
+ private static final String START_DATA_SUFFIX = "_start";
+ private static final String FINISH_DATA_SUFFIX = "_finish";
+ private static final FsPermission ROOT_DIR_UMASK = FsPermission
+ .createImmutable((short) 0740);
+ private static final FsPermission HISTORY_FILE_UMASK = FsPermission
+ .createImmutable((short) 0640);
+
+ private FileSystem fs;
+ private Path rootDirPath;
+
+ private ConcurrentMap<ApplicationId, HistoryFileWriter> outstandingWriters =
+ new ConcurrentHashMap<ApplicationId, HistoryFileWriter>();
+
+ public FileSystemApplicationHistoryStore() {
+ super(FileSystemApplicationHistoryStore.class.getName());
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ Path fsWorkingPath =
+ new Path(conf.get(YarnConfiguration.FS_APPLICATION_HISTORY_STORE_URI));
+ rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
+ try {
+ fs = fsWorkingPath.getFileSystem(conf);
+ fs.mkdirs(rootDirPath);
+ fs.setPermission(rootDirPath, ROOT_DIR_UMASK);
+ } catch (IOException e) {
+ LOG.error("Error when initializing FileSystemHistoryStorage", e);
+ throw e;
+ }
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ try {
+ for (Entry<ApplicationId, HistoryFileWriter> entry : outstandingWriters
+ .entrySet()) {
+ entry.getValue().close();
+ }
+ outstandingWriters.clear();
+ } finally {
+ IOUtils.cleanup(LOG, fs);
+ }
+ super.serviceStop();
+ }
+
+ @Override
+ public ApplicationHistoryData getApplication(ApplicationId appId)
+ throws IOException {
+ HistoryFileReader hfReader = getHistoryFileReader(appId);
+ try {
+ boolean readStartData = false;
+ boolean readFinishData = false;
+ ApplicationHistoryData historyData =
+ ApplicationHistoryData.newInstance(appId, null, null, null, null,
+ Long.MIN_VALUE, Long.MIN_VALUE, Long.MAX_VALUE, null,
+ FinalApplicationStatus.UNDEFINED, null);
+ while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
+ HistoryFileReader.Entry entry = hfReader.next();
+ if (entry.key.id.equals(appId.toString())) {
+ if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+ ApplicationStartData startData =
+ parseApplicationStartData(entry.value);
+ mergeApplicationHistoryData(historyData, startData);
+ readStartData = true;
+ } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+ ApplicationFinishData finishData =
+ parseApplicationFinishData(entry.value);
+ mergeApplicationHistoryData(historyData, finishData);
+ readFinishData = true;
+ }
+ }
+ }
+ if (!readStartData && !readFinishData) {
+ return null;
+ }
+ if (!readStartData) {
+ LOG.warn("Start information is missing for application " + appId);
+ }
+ if (!readFinishData) {
+ LOG.warn("Finish information is missing for application " + appId);
+ }
+ LOG.info("Completed reading history information of application " + appId);
+ return historyData;
+ } catch (IOException e) {
+ LOG.error("Error when reading history file of application " + appId);
+ throw e;
+ } finally {
+ hfReader.close();
+ }
+ }
+
+ @Override
+ public Map<ApplicationId, ApplicationHistoryData> getAllApplications()
+ throws IOException {
+ Map<ApplicationId, ApplicationHistoryData> historyDataMap =
+ new HashMap<ApplicationId, ApplicationHistoryData>();
+ FileStatus[] files = fs.listStatus(rootDirPath);
+ for (FileStatus file : files) {
+ ApplicationId appId =
+ ConverterUtils.toApplicationId(file.getPath().getName());
+ try {
+ ApplicationHistoryData historyData = getApplication(appId);
+ if (historyData != null) {
+ historyDataMap.put(appId, historyData);
+ }
+ } catch (IOException e) {
+ // Eat the exception not to disturb the getting the next
+ // ApplicationHistoryData
+ LOG.error("History information of application " + appId
+ + " is not included into the result due to the exception", e);
+ }
+ }
+ return historyDataMap;
+ }
+
+ @Override
+ public Map<ApplicationAttemptId, ApplicationAttemptHistoryData>
+ getApplicationAttempts(ApplicationId appId) throws IOException {
+ Map<ApplicationAttemptId, ApplicationAttemptHistoryData> historyDataMap =
+ new HashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>();
+ HistoryFileReader hfReader = getHistoryFileReader(appId);
+ try {
+ while (hfReader.hasNext()) {
+ HistoryFileReader.Entry entry = hfReader.next();
+ if (entry.key.id.startsWith(
+ ConverterUtils.APPLICATION_ATTEMPT_PREFIX)) {
+ ApplicationAttemptId appAttemptId =
+ ConverterUtils.toApplicationAttemptId(entry.key.id);
+ if (appAttemptId.getApplicationId().equals(appId)) {
+ ApplicationAttemptHistoryData historyData =
+ historyDataMap.get(appAttemptId);
+ if (historyData == null) {
+ historyData = ApplicationAttemptHistoryData.newInstance(
+ appAttemptId, null, -1, null, null, null,
+ FinalApplicationStatus.UNDEFINED, null);
+ historyDataMap.put(appAttemptId, historyData);
+ }
+ if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+ mergeApplicationAttemptHistoryData(historyData,
+ parseApplicationAttemptStartData(entry.value));
+ } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+ mergeApplicationAttemptHistoryData(historyData,
+ parseApplicationAttemptFinishData(entry.value));
+ }
+ }
+ }
+ }
+ LOG.info("Completed reading history information of all application"
+ + " attempts of application " + appId);
+ } catch (IOException e) {
+ LOG.info("Error when reading history information of some application"
+ + " attempts of application " + appId);
+ } finally {
+ hfReader.close();
+ }
+ return historyDataMap;
+ }
+
+ @Override
+ public ApplicationAttemptHistoryData getApplicationAttempt(
+ ApplicationAttemptId appAttemptId) throws IOException {
+ HistoryFileReader hfReader =
+ getHistoryFileReader(appAttemptId.getApplicationId());
+ try {
+ boolean readStartData = false;
+ boolean readFinishData = false;
+ ApplicationAttemptHistoryData historyData =
+ ApplicationAttemptHistoryData.newInstance(appAttemptId, null, -1,
+ null, null, null, FinalApplicationStatus.UNDEFINED, null);
+ while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
+ HistoryFileReader.Entry entry = hfReader.next();
+ if (entry.key.id.equals(appAttemptId.toString())) {
+ if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+ ApplicationAttemptStartData startData =
+ parseApplicationAttemptStartData(entry.value);
+ mergeApplicationAttemptHistoryData(historyData, startData);
+ readStartData = true;
+ } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+ ApplicationAttemptFinishData finishData =
+ parseApplicationAttemptFinishData(entry.value);
+ mergeApplicationAttemptHistoryData(historyData, finishData);
+ readFinishData = true;
+ }
+ }
+ }
+ if (!readStartData && !readFinishData) {
+ return null;
+ }
+ if (!readStartData) {
+ LOG.warn("Start information is missing for application attempt "
+ + appAttemptId);
+ }
+ if (!readFinishData) {
+ LOG.warn("Finish information is missing for application attempt "
+ + appAttemptId);
+ }
+ LOG.info("Completed reading history information of application attempt "
+ + appAttemptId);
+ return historyData;
+ } catch (IOException e) {
+ LOG.error("Error when reading history file of application attempt"
+ + appAttemptId);
+ throw e;
+ } finally {
+ hfReader.close();
+ }
+ }
+
+ @Override
+ public ContainerHistoryData getContainer(ContainerId containerId)
+ throws IOException {
+ HistoryFileReader hfReader =
+ getHistoryFileReader(containerId.getApplicationAttemptId()
+ .getApplicationId());
+ try {
+ boolean readStartData = false;
+ boolean readFinishData = false;
+ ContainerHistoryData historyData =
+ ContainerHistoryData
+ .newInstance(containerId, null, null, null, Long.MIN_VALUE,
+ Long.MAX_VALUE, null, Integer.MAX_VALUE, null);
+ while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
+ HistoryFileReader.Entry entry = hfReader.next();
+ if (entry.key.id.equals(containerId.toString())) {
+ if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+ ContainerStartData startData = parseContainerStartData(entry.value);
+ mergeContainerHistoryData(historyData, startData);
+ readStartData = true;
+ } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+ ContainerFinishData finishData =
+ parseContainerFinishData(entry.value);
+ mergeContainerHistoryData(historyData, finishData);
+ readFinishData = true;
+ }
+ }
+ }
+ if (!readStartData && !readFinishData) {
+ return null;
+ }
+ if (!readStartData) {
+ LOG.warn("Start information is missing for container " + containerId);
+ }
+ if (!readFinishData) {
+ LOG.warn("Finish information is missing for container " + containerId);
+ }
+ LOG.info("Completed reading history information of container "
+ + containerId);
+ return historyData;
+ } catch (IOException e) {
+ LOG.error("Error when reading history file of container " + containerId);
+ throw e;
+ } finally {
+ hfReader.close();
+ }
+ }
+
+ @Override
+ public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId)
+ throws IOException {
+ ApplicationAttemptHistoryData attemptHistoryData =
+ getApplicationAttempt(appAttemptId);
+ if (attemptHistoryData == null
+ || attemptHistoryData.getMasterContainerId() == null) {
+ return null;
+ }
+ return getContainer(attemptHistoryData.getMasterContainerId());
+ }
+
+ @Override
+ public Map<ContainerId, ContainerHistoryData> getContainers(
+ ApplicationAttemptId appAttemptId) throws IOException {
+ Map<ContainerId, ContainerHistoryData> historyDataMap =
+ new HashMap<ContainerId, ContainerHistoryData>();
+ HistoryFileReader hfReader =
+ getHistoryFileReader(appAttemptId.getApplicationId());
+ try {
+ while (hfReader.hasNext()) {
+ HistoryFileReader.Entry entry = hfReader.next();
+ if (entry.key.id.startsWith(ConverterUtils.CONTAINER_PREFIX)) {
+ ContainerId containerId =
+ ConverterUtils.toContainerId(entry.key.id);
+ if (containerId.getApplicationAttemptId().equals(appAttemptId)) {
+ ContainerHistoryData historyData =
+ historyDataMap.get(containerId);
+ if (historyData == null) {
+ historyData = ContainerHistoryData.newInstance(
+ containerId, null, null, null, Long.MIN_VALUE,
+ Long.MAX_VALUE, null, Integer.MAX_VALUE, null);
+ historyDataMap.put(containerId, historyData);
+ }
+ if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+ mergeContainerHistoryData(historyData,
+ parseContainerStartData(entry.value));
+ } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+ mergeContainerHistoryData(historyData,
+ parseContainerFinishData(entry.value));
+ }
+ }
+ }
+ }
+ LOG.info("Completed reading history information of all conatiners"
+ + " of application attempt " + appAttemptId);
+ } catch (IOException e) {
+ LOG.info("Error when reading history information of some containers"
+ + " of application attempt " + appAttemptId);
+ } finally {
+ hfReader.close();
+ }
+ return historyDataMap;
+ }
+
+ @Override
+ public void applicationStarted(ApplicationStartData appStart)
+ throws IOException {
+ HistoryFileWriter hfWriter =
+ outstandingWriters.get(appStart.getApplicationId());
+ if (hfWriter == null) {
+ Path applicationHistoryFile =
+ new Path(rootDirPath, appStart.getApplicationId().toString());
+ try {
+ hfWriter = new HistoryFileWriter(applicationHistoryFile);
+ LOG.info("Opened history file of application "
+ + appStart.getApplicationId());
+ } catch (IOException e) {
+ LOG.error("Error when openning history file of application "
+ + appStart.getApplicationId());
+ throw e;
+ }
+ outstandingWriters.put(appStart.getApplicationId(), hfWriter);
+ } else {
+ throw new IOException("History file of application "
+ + appStart.getApplicationId() + " is already opened");
+ }
+ assert appStart instanceof ApplicationStartDataPBImpl;
+ try {
+ hfWriter.writeHistoryData(new HistoryDataKey(appStart.getApplicationId()
+ .toString(), START_DATA_SUFFIX),
+ ((ApplicationStartDataPBImpl) appStart).getProto().toByteArray());
+ LOG.info("Start information of application "
+ + appStart.getApplicationId() + " is written");
+ } catch (IOException e) {
+ LOG.error("Error when writing start information of application "
+ + appStart.getApplicationId());
+ throw e;
+ }
+ }
+
+ @Override
+ public void applicationFinished(ApplicationFinishData appFinish)
+ throws IOException {
+ HistoryFileWriter hfWriter =
+ getHistoryFileWriter(appFinish.getApplicationId());
+ assert appFinish instanceof ApplicationFinishDataPBImpl;
+ try {
+ hfWriter.writeHistoryData(new HistoryDataKey(appFinish.getApplicationId()
+ .toString(), FINISH_DATA_SUFFIX),
+ ((ApplicationFinishDataPBImpl) appFinish).getProto().toByteArray());
+ LOG.info("Finish information of application "
+ + appFinish.getApplicationId() + " is written");
+ } catch (IOException e) {
+ LOG.error("Error when writing finish information of application "
+ + appFinish.getApplicationId());
+ throw e;
+ } finally {
+ hfWriter.close();
+ outstandingWriters.remove(appFinish.getApplicationId());
+ }
+ }
+
+ @Override
+ public void applicationAttemptStarted(
+ ApplicationAttemptStartData appAttemptStart) throws IOException {
+ HistoryFileWriter hfWriter =
+ getHistoryFileWriter(appAttemptStart.getApplicationAttemptId()
+ .getApplicationId());
+ assert appAttemptStart instanceof ApplicationAttemptStartDataPBImpl;
+ try {
+ hfWriter.writeHistoryData(new HistoryDataKey(appAttemptStart
+ .getApplicationAttemptId().toString(), START_DATA_SUFFIX),
+ ((ApplicationAttemptStartDataPBImpl) appAttemptStart).getProto()
+ .toByteArray());
+ LOG.info("Start information of application attempt "
+ + appAttemptStart.getApplicationAttemptId() + " is written");
+ } catch (IOException e) {
+ LOG.error("Error when writing start information of application attempt "
+ + appAttemptStart.getApplicationAttemptId());
+ throw e;
+ }
+ }
+
+ @Override
+ public void applicationAttemptFinished(
+ ApplicationAttemptFinishData appAttemptFinish) throws IOException {
+ HistoryFileWriter hfWriter =
+ getHistoryFileWriter(appAttemptFinish.getApplicationAttemptId()
+ .getApplicationId());
+ assert appAttemptFinish instanceof ApplicationAttemptFinishDataPBImpl;
+ try {
+ hfWriter.writeHistoryData(new HistoryDataKey(appAttemptFinish
+ .getApplicationAttemptId().toString(), FINISH_DATA_SUFFIX),
+ ((ApplicationAttemptFinishDataPBImpl) appAttemptFinish).getProto()
+ .toByteArray());
+ LOG.info("Finish information of application attempt "
+ + appAttemptFinish.getApplicationAttemptId() + " is written");
+ } catch (IOException e) {
+ LOG.error("Error when writing finish information of application attempt "
+ + appAttemptFinish.getApplicationAttemptId());
+ throw e;
+ }
+ }
+
+ @Override
+ public void containerStarted(ContainerStartData containerStart)
+ throws IOException {
+ HistoryFileWriter hfWriter =
+ getHistoryFileWriter(containerStart.getContainerId()
+ .getApplicationAttemptId().getApplicationId());
+ assert containerStart instanceof ContainerStartDataPBImpl;
+ try {
+ hfWriter.writeHistoryData(new HistoryDataKey(containerStart
+ .getContainerId().toString(), START_DATA_SUFFIX),
+ ((ContainerStartDataPBImpl) containerStart).getProto().toByteArray());
+ LOG.info("Start information of container "
+ + containerStart.getContainerId() + " is written");
+ } catch (IOException e) {
+ LOG.error("Error when writing start information of container "
+ + containerStart.getContainerId());
+ throw e;
+ }
+ }
+
+ @Override
+ public void containerFinished(ContainerFinishData containerFinish)
+ throws IOException {
+ HistoryFileWriter hfWriter =
+ getHistoryFileWriter(containerFinish.getContainerId()
+ .getApplicationAttemptId().getApplicationId());
+ assert containerFinish instanceof ContainerFinishDataPBImpl;
+ try {
+ hfWriter.writeHistoryData(new HistoryDataKey(containerFinish
+ .getContainerId().toString(), FINISH_DATA_SUFFIX),
+ ((ContainerFinishDataPBImpl) containerFinish).getProto().toByteArray());
+ LOG.info("Finish information of container "
+ + containerFinish.getContainerId() + " is written");
+ } catch (IOException e) {
+ LOG.error("Error when writing finish information of container "
+ + containerFinish.getContainerId());
+ }
+ }
+
+ private static ApplicationStartData parseApplicationStartData(byte[] value)
+ throws InvalidProtocolBufferException {
+ return new ApplicationStartDataPBImpl(
+ ApplicationStartDataProto.parseFrom(value));
+ }
+
+ private static ApplicationFinishData parseApplicationFinishData(byte[] value)
+ throws InvalidProtocolBufferException {
+ return new ApplicationFinishDataPBImpl(
+ ApplicationFinishDataProto.parseFrom(value));
+ }
+
+ private static ApplicationAttemptStartData parseApplicationAttemptStartData(
+ byte[] value) throws InvalidProtocolBufferException {
+ return new ApplicationAttemptStartDataPBImpl(
+ ApplicationAttemptStartDataProto.parseFrom(value));
+ }
+
+ private static ApplicationAttemptFinishData
+ parseApplicationAttemptFinishData(byte[] value)
+ throws InvalidProtocolBufferException {
+ return new ApplicationAttemptFinishDataPBImpl(
+ ApplicationAttemptFinishDataProto.parseFrom(value));
+ }
+
+ private static ContainerStartData parseContainerStartData(byte[] value)
+ throws InvalidProtocolBufferException {
+ return new ContainerStartDataPBImpl(
+ ContainerStartDataProto.parseFrom(value));
+ }
+
+ private static ContainerFinishData parseContainerFinishData(byte[] value)
+ throws InvalidProtocolBufferException {
+ return new ContainerFinishDataPBImpl(
+ ContainerFinishDataProto.parseFrom(value));
+ }
+
+ private static void mergeApplicationHistoryData(
+ ApplicationHistoryData historyData, ApplicationStartData startData) {
+ historyData.setApplicationName(startData.getApplicationName());
+ historyData.setApplicationType(startData.getApplicationType());
+ historyData.setQueue(startData.getQueue());
+ historyData.setUser(startData.getUser());
+ historyData.setSubmitTime(startData.getSubmitTime());
+ historyData.setStartTime(startData.getStartTime());
+ }
+
+ private static void mergeApplicationHistoryData(
+ ApplicationHistoryData historyData, ApplicationFinishData finishData) {
+ historyData.setFinishTime(finishData.getFinishTime());
+ historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
+ historyData.setFinalApplicationStatus(finishData
+ .getFinalApplicationStatus());
+ historyData.setYarnApplicationState(finishData.getYarnApplicationState());
+ }
+
+ private static void mergeApplicationAttemptHistoryData(
+ ApplicationAttemptHistoryData historyData,
+ ApplicationAttemptStartData startData) {
+ historyData.setHost(startData.getHost());
+ historyData.setRPCPort(startData.getRPCPort());
+ historyData.setMasterContainerId(startData.getMasterContainerId());
+ }
+
+ private static void mergeApplicationAttemptHistoryData(
+ ApplicationAttemptHistoryData historyData,
+ ApplicationAttemptFinishData finishData) {
+ historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
+ historyData.setTrackingURL(finishData.getTrackingURL());
+ historyData.setFinalApplicationStatus(finishData
+ .getFinalApplicationStatus());
+ historyData.setYarnApplicationAttemptState(finishData
+ .getYarnApplicationAttemptState());
+ }
+
+ private static void mergeContainerHistoryData(
+ ContainerHistoryData historyData, ContainerStartData startData) {
+ historyData.setAllocatedResource(startData.getAllocatedResource());
+ historyData.setAssignedNode(startData.getAssignedNode());
+ historyData.setPriority(startData.getPriority());
+ historyData.setStartTime(startData.getStartTime());
+ }
+
+ private static void mergeContainerHistoryData(
+ ContainerHistoryData historyData, ContainerFinishData finishData) {
+ historyData.setFinishTime(finishData.getFinishTime());
+ historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
+ historyData.setContainerExitStatus(finishData.getContainerExitStatus());
+ historyData.setContainerState(finishData.getContainerState());
+ }
+
+ private HistoryFileWriter getHistoryFileWriter(ApplicationId appId)
+ throws IOException {
+ HistoryFileWriter hfWriter = outstandingWriters.get(appId);
+ if (hfWriter == null) {
+ throw new IOException("History file of application " + appId
+ + " is not opened");
+ }
+ return hfWriter;
+ }
+
+ private HistoryFileReader getHistoryFileReader(ApplicationId appId)
+ throws IOException {
+ Path applicationHistoryFile = new Path(rootDirPath, appId.toString());
+ if (!fs.exists(applicationHistoryFile)) {
+ throw new IOException("History file for application " + appId
+ + " is not found");
+ }
+ // The history file is still under writing
+ if (outstandingWriters.containsKey(appId)) {
+ throw new IOException("History file for application " + appId
+ + " is under writing");
+ }
+ return new HistoryFileReader(applicationHistoryFile);
+ }
+
+ private class HistoryFileReader {
+
+ private class Entry {
+
+ private HistoryDataKey key;
+ private byte[] value;
+
+ public Entry(HistoryDataKey key, byte[] value) {
+ this.key = key;
+ this.value = value;
+ }
+ }
+
+ private TFile.Reader reader;
+ private TFile.Reader.Scanner scanner;
+
+ public HistoryFileReader(Path historyFile) throws IOException {
+ FSDataInputStream fsdis = fs.open(historyFile);
+ reader =
+ new TFile.Reader(fsdis, fs.getFileStatus(historyFile).getLen(),
+ getConfig());
+ reset();
+ }
+
+ public boolean hasNext() {
+ return !scanner.atEnd();
+ }
+
+ public Entry next() throws IOException {
+ TFile.Reader.Scanner.Entry entry = scanner.entry();
+ DataInputStream dis = entry.getKeyStream();
+ HistoryDataKey key = new HistoryDataKey();
+ key.readFields(dis);
+ dis = entry.getValueStream();
+ byte[] value = new byte[entry.getValueLength()];
+ dis.read(value);
+ scanner.advance();
+ return new Entry(key, value);
+ }
+
+ public void reset() throws IOException {
+ IOUtils.cleanup(LOG, scanner);
+ scanner = reader.createScanner();
+ }
+
+ public void close() {
+ IOUtils.cleanup(LOG, scanner, reader);
+ }
+
+ }
+
+ private class HistoryFileWriter {
+
+ private FSDataOutputStream fsdos;
+ private TFile.Writer writer;
+
+ public HistoryFileWriter(Path historyFile) throws IOException {
+ if (fs.exists(historyFile)) {
+ fsdos = fs.append(historyFile);
+ } else {
+ fsdos = fs.create(historyFile);
+ }
+ fs.setPermission(historyFile, HISTORY_FILE_UMASK);
+ writer =
+ new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get(
+ YarnConfiguration.FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE,
+ YarnConfiguration.DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE), null,
+ getConfig());
+ }
+
+ public synchronized void close() {
+ IOUtils.cleanup(LOG, writer, fsdos);
+ }
+
+ public synchronized void writeHistoryData(HistoryDataKey key, byte[] value)
+ throws IOException {
+ DataOutputStream dos = null;
+ try {
+ dos = writer.prepareAppendKey(-1);
+ key.write(dos);
+ } finally {
+ IOUtils.cleanup(LOG, dos);
+ }
+ try {
+ dos = writer.prepareAppendValue(value.length);
+ dos.write(value);
+ } finally {
+ IOUtils.cleanup(LOG, dos);
+ }
+ }
+
+ }
+
+ private static class HistoryDataKey implements Writable {
+
+ private String id;
+
+ private String suffix;
+
+ public HistoryDataKey() {
+ this(null, null);
+ }
+
+ public HistoryDataKey(String id, String suffix) {
+ this.id = id;
+ this.suffix = suffix;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(id);
+ out.writeUTF(suffix);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ id = in.readUTF();
+ suffix = in.readUTF();
+ }
+ }
+}