You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/09/22 20:02:31 UTC
[21/22] AMBARI-5707. Metrics system prototype implementation. (swagle)
http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryReader.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryReader.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryReader.java
new file mode 100644
index 0000000..590853a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryReader.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface ApplicationHistoryReader {
+
+ /**
+ * This method returns Application {@link ApplicationHistoryData} for the
+ * specified {@link ApplicationId}.
+ *
+ * @param appId
+ *
+ * @return {@link ApplicationHistoryData} for the ApplicationId.
+ * @throws IOException
+ */
+ ApplicationHistoryData getApplication(ApplicationId appId) throws IOException;
+
+ /**
+ * This method returns all Application {@link ApplicationHistoryData}s
+ *
+ * @return map of {@link ApplicationId} to {@link ApplicationHistoryData}s.
+ * @throws IOException
+ */
+ Map<ApplicationId, ApplicationHistoryData> getAllApplications()
+ throws IOException;
+
+ /**
+ * Application can have multiple application attempts
+ * {@link ApplicationAttemptHistoryData}. This method returns the all
+ * {@link ApplicationAttemptHistoryData}s for the Application.
+ *
+ * @param appId
+ *
+ * @return all {@link ApplicationAttemptHistoryData}s for the Application.
+ * @throws IOException
+ */
+ Map<ApplicationAttemptId, ApplicationAttemptHistoryData>
+ getApplicationAttempts(ApplicationId appId) throws IOException;
+
+ /**
+ * This method returns {@link ApplicationAttemptHistoryData} for specified
+ * {@link ApplicationId}.
+ *
+ * @param appAttemptId
+ * {@link ApplicationAttemptId}
+ * @return {@link ApplicationAttemptHistoryData} for ApplicationAttemptId
+ * @throws IOException
+ */
+ ApplicationAttemptHistoryData getApplicationAttempt(
+ ApplicationAttemptId appAttemptId) throws IOException;
+
+ /**
+ * This method returns {@link ContainerHistoryData} for specified
+ * {@link ContainerId}.
+ *
+ * @param containerId
+ * {@link ContainerId}
+ * @return {@link ContainerHistoryData} for ContainerId
+ * @throws IOException
+ */
+ ContainerHistoryData getContainer(ContainerId containerId) throws IOException;
+
+ /**
+ * This method returns {@link ContainerHistoryData} for specified
+ * {@link ApplicationAttemptId}.
+ *
+ * @param appAttemptId
+ * {@link ApplicationAttemptId}
+ * @return {@link ContainerHistoryData} for ApplicationAttemptId
+ * @throws IOException
+ */
+ ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId)
+ throws IOException;
+
+ /**
+ * This method returns Map{@link ContainerId} to {@link ContainerHistoryData}
+ * for specified {@link ApplicationAttemptId}.
+ *
+ * @param appAttemptId
+ * {@link ApplicationAttemptId}
+ * @return Map{@link ContainerId} to {@link ContainerHistoryData} for
+ * ApplicationAttemptId
+ * @throws IOException
+ */
+ Map<ContainerId, ContainerHistoryData> getContainers(
+ ApplicationAttemptId appAttemptId) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
new file mode 100644
index 0000000..f622153
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.HBaseTimelineMetricStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.LeveldbTimelineStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
+import org.apache.hadoop.yarn.webapp.WebApp;
+import org.apache.hadoop.yarn.webapp.WebApps;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * History server that keeps track of all types of history in the cluster.
+ * Application specific history to start with.
+ */
+public class ApplicationHistoryServer extends CompositeService {
+
+ public static final int SHUTDOWN_HOOK_PRIORITY = 30;
+ private static final Log LOG = LogFactory
+ .getLog(ApplicationHistoryServer.class);
+
+ ApplicationHistoryClientService ahsClientService;
+ ApplicationHistoryManager historyManager;
+ TimelineStore timelineStore;
+ TimelineMetricStore timelineMetricStore;
+ private WebApp webApp;
+
+ public ApplicationHistoryServer() {
+ super(ApplicationHistoryServer.class.getName());
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ historyManager = createApplicationHistory();
+ ahsClientService = createApplicationHistoryClientService(historyManager);
+ addService(ahsClientService);
+ addService((Service) historyManager);
+ timelineStore = createTimelineStore(conf);
+ timelineMetricStore = createTimelineMetricStore(conf);
+ addIfService(timelineStore);
+ addIfService(timelineMetricStore);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ DefaultMetricsSystem.initialize("ApplicationHistoryServer");
+ JvmMetrics.initSingleton("ApplicationHistoryServer", null);
+
+ startWebApp();
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (webApp != null) {
+ webApp.stop();
+ }
+
+ DefaultMetricsSystem.shutdown();
+ super.serviceStop();
+ }
+
+ @Private
+ @VisibleForTesting
+ public ApplicationHistoryClientService getClientService() {
+ return this.ahsClientService;
+ }
+
+ protected ApplicationHistoryClientService
+ createApplicationHistoryClientService(
+ ApplicationHistoryManager historyManager) {
+ return new ApplicationHistoryClientService(historyManager);
+ }
+
+ protected ApplicationHistoryManager createApplicationHistory() {
+ return new ApplicationHistoryManagerImpl();
+ }
+
+ protected ApplicationHistoryManager getApplicationHistory() {
+ return this.historyManager;
+ }
+
+ static ApplicationHistoryServer launchAppHistoryServer(String[] args) {
+ Thread
+ .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+ StringUtils.startupShutdownMessage(ApplicationHistoryServer.class, args,
+ LOG);
+ ApplicationHistoryServer appHistoryServer = null;
+ try {
+ appHistoryServer = new ApplicationHistoryServer();
+ ShutdownHookManager.get().addShutdownHook(
+ new CompositeServiceShutdownHook(appHistoryServer),
+ SHUTDOWN_HOOK_PRIORITY);
+ YarnConfiguration conf = new YarnConfiguration();
+ appHistoryServer.init(conf);
+ appHistoryServer.start();
+ } catch (Throwable t) {
+ LOG.fatal("Error starting ApplicationHistoryServer", t);
+ ExitUtil.terminate(-1, "Error starting ApplicationHistoryServer");
+ }
+ return appHistoryServer;
+ }
+
+ public static void main(String[] args) {
+ launchAppHistoryServer(args);
+ }
+
+ protected ApplicationHistoryManager createApplicationHistoryManager(
+ Configuration conf) {
+ return new ApplicationHistoryManagerImpl();
+ }
+
+ protected TimelineStore createTimelineStore(
+ Configuration conf) {
+ return ReflectionUtils.newInstance(conf.getClass(
+ YarnConfiguration.TIMELINE_SERVICE_STORE, LeveldbTimelineStore.class,
+ TimelineStore.class), conf);
+ }
+
+ protected TimelineMetricStore createTimelineMetricStore(Configuration conf) {
+ LOG.info("Creating metrics store.");
+ return ReflectionUtils.newInstance(HBaseTimelineMetricStore.class, conf);
+ }
+
+ protected void startWebApp() {
+ String bindAddress = WebAppUtils.getAHSWebAppURLWithoutScheme(getConfig());
+ LOG.info("Instantiating AHSWebApp at " + bindAddress);
+ try {
+ webApp =
+ WebApps
+ .$for("applicationhistory", ApplicationHistoryClientService.class,
+ ahsClientService, "ws")
+ .with(getConfig())
+ .withHttpSpnegoPrincipalKey(
+ YarnConfiguration.TIMELINE_SERVICE_WEBAPP_SPNEGO_USER_NAME_KEY)
+ .withHttpSpnegoKeytabKey(
+ YarnConfiguration.TIMELINE_SERVICE_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
+ .at(bindAddress)
+ .start(new AHSWebApp(historyManager, timelineStore, timelineMetricStore));
+ } catch (Exception e) {
+ String msg = "AHSWebApp failed to start.";
+ LOG.error(msg, e);
+ throw new YarnRuntimeException(msg, e);
+ }
+ }
+ /**
+ * @return ApplicationTimelineStore
+ */
+ @Private
+ @VisibleForTesting
+ public TimelineStore getTimelineStore() {
+ return timelineStore;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryStore.java
new file mode 100644
index 0000000..c26faef
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryStore.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.Service;
+
+/**
+ * This class is the abstract of the storage of the application history data. It
+ * is a {@link Service}, such that the implementation of this class can make use
+ * of the service life cycle to initialize and cleanup the storage. Users can
+ * access the storage via {@link ApplicationHistoryReader} and
+ * {@link ApplicationHistoryWriter} interfaces.
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface ApplicationHistoryStore extends Service,
+ ApplicationHistoryReader, ApplicationHistoryWriter {
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java
new file mode 100644
index 0000000..09ba36d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
+
+/**
+ * It is the interface of writing the application history, exposing the methods
+ * of writing {@link ApplicationStartData}, {@link ApplicationFinishData}
+ * {@link ApplicationAttemptStartData}, {@link ApplicationAttemptFinishData},
+ * {@link ContainerStartData} and {@link ContainerFinishData}.
+ */
+@Private
+@Unstable
+public interface ApplicationHistoryWriter {
+
+ /**
+ * This method writes the information of <code>RMApp</code> that is available
+ * when it starts.
+ *
+ * @param appStart
+ * the record of the information of <code>RMApp</code> that is
+ * available when it starts
+ * @throws IOException
+ */
+ void applicationStarted(ApplicationStartData appStart) throws IOException;
+
+ /**
+ * This method writes the information of <code>RMApp</code> that is available
+ * when it finishes.
+ *
+ * @param appFinish
+ * the record of the information of <code>RMApp</code> that is
+ * available when it finishes
+ * @throws IOException
+ */
+ void applicationFinished(ApplicationFinishData appFinish) throws IOException;
+
+ /**
+ * This method writes the information of <code>RMAppAttempt</code> that is
+ * available when it starts.
+ *
+ * @param appAttemptStart
+ * the record of the information of <code>RMAppAttempt</code> that is
+ * available when it starts
+ * @throws IOException
+ */
+ void applicationAttemptStarted(ApplicationAttemptStartData appAttemptStart)
+ throws IOException;
+
+ /**
+ * This method writes the information of <code>RMAppAttempt</code> that is
+ * available when it finishes.
+ *
+ * @param appAttemptFinish
+ * the record of the information of <code>RMAppAttempt</code> that is
+ * available when it finishes
+ * @throws IOException
+ */
+ void
+ applicationAttemptFinished(ApplicationAttemptFinishData appAttemptFinish)
+ throws IOException;
+
+ /**
+ * This method writes the information of <code>RMContainer</code> that is
+ * available when it starts.
+ *
+ * @param containerStart
+ * the record of the information of <code>RMContainer</code> that is
+ * available when it starts
+ * @throws IOException
+ */
+ void containerStarted(ContainerStartData containerStart) throws IOException;
+
+ /**
+ * This method writes the information of <code>RMContainer</code> that is
+ * available when it finishes.
+ *
+ * @param containerFinish
+ * the record of the information of <code>RMContainer</code> that is
+ * available when it finishes
+ * @throws IOException
+ */
+ void containerFinished(ContainerFinishData containerFinish)
+ throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java
new file mode 100644
index 0000000..4c8d745
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java
@@ -0,0 +1,784 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.file.tfile.TFile;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptFinishDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptStartDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationFinishDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationStartDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerFinishDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerStartDataProto;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptFinishDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptStartDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationFinishDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationStartDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerFinishDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerStartDataPBImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * File system implementation of {@link ApplicationHistoryStore}. In this
+ * implementation, one application will have just one file in the file system,
+ * which contains all the history data of one application, and its attempts and
+ * containers. {@link #applicationStarted(ApplicationStartData)} is supposed to
+ * be invoked first when writing any history data of one application and it will
+ * open a file, while {@link #applicationFinished(ApplicationFinishData)} is
+ * supposed to be last writing operation and will close the file.
+ */
+@Public
+@Unstable
+public class FileSystemApplicationHistoryStore extends AbstractService
+ implements ApplicationHistoryStore {
+
+ private static final Log LOG = LogFactory
+ .getLog(FileSystemApplicationHistoryStore.class);
+
+ private static final String ROOT_DIR_NAME = "ApplicationHistoryDataRoot";
+ private static final int MIN_BLOCK_SIZE = 256 * 1024;
+ private static final String START_DATA_SUFFIX = "_start";
+ private static final String FINISH_DATA_SUFFIX = "_finish";
+ private static final FsPermission ROOT_DIR_UMASK = FsPermission
+ .createImmutable((short) 0740);
+ private static final FsPermission HISTORY_FILE_UMASK = FsPermission
+ .createImmutable((short) 0640);
+
+ private FileSystem fs;
+ private Path rootDirPath;
+
+ private ConcurrentMap<ApplicationId, HistoryFileWriter> outstandingWriters =
+ new ConcurrentHashMap<ApplicationId, HistoryFileWriter>();
+
+ public FileSystemApplicationHistoryStore() {
+ super(FileSystemApplicationHistoryStore.class.getName());
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ Path fsWorkingPath =
+ new Path(conf.get(YarnConfiguration.FS_APPLICATION_HISTORY_STORE_URI));
+ rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
+ try {
+ fs = fsWorkingPath.getFileSystem(conf);
+ fs.mkdirs(rootDirPath);
+ fs.setPermission(rootDirPath, ROOT_DIR_UMASK);
+ } catch (IOException e) {
+ LOG.error("Error when initializing FileSystemHistoryStorage", e);
+ throw e;
+ }
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ try {
+ for (Entry<ApplicationId, HistoryFileWriter> entry : outstandingWriters
+ .entrySet()) {
+ entry.getValue().close();
+ }
+ outstandingWriters.clear();
+ } finally {
+ IOUtils.cleanup(LOG, fs);
+ }
+ super.serviceStop();
+ }
+
+ @Override
+ public ApplicationHistoryData getApplication(ApplicationId appId)
+ throws IOException {
+ HistoryFileReader hfReader = getHistoryFileReader(appId);
+ try {
+ boolean readStartData = false;
+ boolean readFinishData = false;
+ ApplicationHistoryData historyData =
+ ApplicationHistoryData.newInstance(appId, null, null, null, null,
+ Long.MIN_VALUE, Long.MIN_VALUE, Long.MAX_VALUE, null,
+ FinalApplicationStatus.UNDEFINED, null);
+ while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
+ HistoryFileReader.Entry entry = hfReader.next();
+ if (entry.key.id.equals(appId.toString())) {
+ if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+ ApplicationStartData startData =
+ parseApplicationStartData(entry.value);
+ mergeApplicationHistoryData(historyData, startData);
+ readStartData = true;
+ } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+ ApplicationFinishData finishData =
+ parseApplicationFinishData(entry.value);
+ mergeApplicationHistoryData(historyData, finishData);
+ readFinishData = true;
+ }
+ }
+ }
+ if (!readStartData && !readFinishData) {
+ return null;
+ }
+ if (!readStartData) {
+ LOG.warn("Start information is missing for application " + appId);
+ }
+ if (!readFinishData) {
+ LOG.warn("Finish information is missing for application " + appId);
+ }
+ LOG.info("Completed reading history information of application " + appId);
+ return historyData;
+ } catch (IOException e) {
+ LOG.error("Error when reading history file of application " + appId);
+ throw e;
+ } finally {
+ hfReader.close();
+ }
+ }
+
+ @Override
+ public Map<ApplicationId, ApplicationHistoryData> getAllApplications()
+ throws IOException {
+ Map<ApplicationId, ApplicationHistoryData> historyDataMap =
+ new HashMap<ApplicationId, ApplicationHistoryData>();
+ FileStatus[] files = fs.listStatus(rootDirPath);
+ for (FileStatus file : files) {
+ ApplicationId appId =
+ ConverterUtils.toApplicationId(file.getPath().getName());
+ try {
+ ApplicationHistoryData historyData = getApplication(appId);
+ if (historyData != null) {
+ historyDataMap.put(appId, historyData);
+ }
+ } catch (IOException e) {
+ // Eat the exception not to disturb the getting the next
+ // ApplicationHistoryData
+ LOG.error("History information of application " + appId
+ + " is not included into the result due to the exception", e);
+ }
+ }
+ return historyDataMap;
+ }
+
+ @Override
+ public Map<ApplicationAttemptId, ApplicationAttemptHistoryData>
+ getApplicationAttempts(ApplicationId appId) throws IOException {
+ Map<ApplicationAttemptId, ApplicationAttemptHistoryData> historyDataMap =
+ new HashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>();
+ HistoryFileReader hfReader = getHistoryFileReader(appId);
+ try {
+ while (hfReader.hasNext()) {
+ HistoryFileReader.Entry entry = hfReader.next();
+ if (entry.key.id.startsWith(
+ ConverterUtils.APPLICATION_ATTEMPT_PREFIX)) {
+ ApplicationAttemptId appAttemptId =
+ ConverterUtils.toApplicationAttemptId(entry.key.id);
+ if (appAttemptId.getApplicationId().equals(appId)) {
+ ApplicationAttemptHistoryData historyData =
+ historyDataMap.get(appAttemptId);
+ if (historyData == null) {
+ historyData = ApplicationAttemptHistoryData.newInstance(
+ appAttemptId, null, -1, null, null, null,
+ FinalApplicationStatus.UNDEFINED, null);
+ historyDataMap.put(appAttemptId, historyData);
+ }
+ if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+ mergeApplicationAttemptHistoryData(historyData,
+ parseApplicationAttemptStartData(entry.value));
+ } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+ mergeApplicationAttemptHistoryData(historyData,
+ parseApplicationAttemptFinishData(entry.value));
+ }
+ }
+ }
+ }
+ LOG.info("Completed reading history information of all application"
+ + " attempts of application " + appId);
+ } catch (IOException e) {
+ LOG.info("Error when reading history information of some application"
+ + " attempts of application " + appId);
+ } finally {
+ hfReader.close();
+ }
+ return historyDataMap;
+ }
+
+ @Override
+ public ApplicationAttemptHistoryData getApplicationAttempt(
+ ApplicationAttemptId appAttemptId) throws IOException {
+ HistoryFileReader hfReader =
+ getHistoryFileReader(appAttemptId.getApplicationId());
+ try {
+ boolean readStartData = false;
+ boolean readFinishData = false;
+ ApplicationAttemptHistoryData historyData =
+ ApplicationAttemptHistoryData.newInstance(appAttemptId, null, -1,
+ null, null, null, FinalApplicationStatus.UNDEFINED, null);
+ while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
+ HistoryFileReader.Entry entry = hfReader.next();
+ if (entry.key.id.equals(appAttemptId.toString())) {
+ if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+ ApplicationAttemptStartData startData =
+ parseApplicationAttemptStartData(entry.value);
+ mergeApplicationAttemptHistoryData(historyData, startData);
+ readStartData = true;
+ } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+ ApplicationAttemptFinishData finishData =
+ parseApplicationAttemptFinishData(entry.value);
+ mergeApplicationAttemptHistoryData(historyData, finishData);
+ readFinishData = true;
+ }
+ }
+ }
+ if (!readStartData && !readFinishData) {
+ return null;
+ }
+ if (!readStartData) {
+ LOG.warn("Start information is missing for application attempt "
+ + appAttemptId);
+ }
+ if (!readFinishData) {
+ LOG.warn("Finish information is missing for application attempt "
+ + appAttemptId);
+ }
+ LOG.info("Completed reading history information of application attempt "
+ + appAttemptId);
+ return historyData;
+ } catch (IOException e) {
+ LOG.error("Error when reading history file of application attempt"
+ + appAttemptId);
+ throw e;
+ } finally {
+ hfReader.close();
+ }
+ }
+
+ @Override
+ public ContainerHistoryData getContainer(ContainerId containerId)
+ throws IOException {
+ HistoryFileReader hfReader =
+ getHistoryFileReader(containerId.getApplicationAttemptId()
+ .getApplicationId());
+ try {
+ boolean readStartData = false;
+ boolean readFinishData = false;
+ ContainerHistoryData historyData =
+ ContainerHistoryData
+ .newInstance(containerId, null, null, null, Long.MIN_VALUE,
+ Long.MAX_VALUE, null, Integer.MAX_VALUE, null);
+ while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
+ HistoryFileReader.Entry entry = hfReader.next();
+ if (entry.key.id.equals(containerId.toString())) {
+ if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+ ContainerStartData startData = parseContainerStartData(entry.value);
+ mergeContainerHistoryData(historyData, startData);
+ readStartData = true;
+ } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+ ContainerFinishData finishData =
+ parseContainerFinishData(entry.value);
+ mergeContainerHistoryData(historyData, finishData);
+ readFinishData = true;
+ }
+ }
+ }
+ if (!readStartData && !readFinishData) {
+ return null;
+ }
+ if (!readStartData) {
+ LOG.warn("Start information is missing for container " + containerId);
+ }
+ if (!readFinishData) {
+ LOG.warn("Finish information is missing for container " + containerId);
+ }
+ LOG.info("Completed reading history information of container "
+ + containerId);
+ return historyData;
+ } catch (IOException e) {
+ LOG.error("Error when reading history file of container " + containerId);
+ throw e;
+ } finally {
+ hfReader.close();
+ }
+ }
+
+ @Override
+ public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId)
+ throws IOException {
+ ApplicationAttemptHistoryData attemptHistoryData =
+ getApplicationAttempt(appAttemptId);
+ if (attemptHistoryData == null
+ || attemptHistoryData.getMasterContainerId() == null) {
+ return null;
+ }
+ return getContainer(attemptHistoryData.getMasterContainerId());
+ }
+
+ @Override
+ public Map<ContainerId, ContainerHistoryData> getContainers(
+ ApplicationAttemptId appAttemptId) throws IOException {
+ Map<ContainerId, ContainerHistoryData> historyDataMap =
+ new HashMap<ContainerId, ContainerHistoryData>();
+ HistoryFileReader hfReader =
+ getHistoryFileReader(appAttemptId.getApplicationId());
+ try {
+ while (hfReader.hasNext()) {
+ HistoryFileReader.Entry entry = hfReader.next();
+ if (entry.key.id.startsWith(ConverterUtils.CONTAINER_PREFIX)) {
+ ContainerId containerId =
+ ConverterUtils.toContainerId(entry.key.id);
+ if (containerId.getApplicationAttemptId().equals(appAttemptId)) {
+ ContainerHistoryData historyData =
+ historyDataMap.get(containerId);
+ if (historyData == null) {
+ historyData = ContainerHistoryData.newInstance(
+ containerId, null, null, null, Long.MIN_VALUE,
+ Long.MAX_VALUE, null, Integer.MAX_VALUE, null);
+ historyDataMap.put(containerId, historyData);
+ }
+ if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+ mergeContainerHistoryData(historyData,
+ parseContainerStartData(entry.value));
+ } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+ mergeContainerHistoryData(historyData,
+ parseContainerFinishData(entry.value));
+ }
+ }
+ }
+ }
+ LOG.info("Completed reading history information of all conatiners"
+ + " of application attempt " + appAttemptId);
+ } catch (IOException e) {
+ LOG.info("Error when reading history information of some containers"
+ + " of application attempt " + appAttemptId);
+ } finally {
+ hfReader.close();
+ }
+ return historyDataMap;
+ }
+
+ @Override
+ public void applicationStarted(ApplicationStartData appStart)
+ throws IOException {
+ HistoryFileWriter hfWriter =
+ outstandingWriters.get(appStart.getApplicationId());
+ if (hfWriter == null) {
+ Path applicationHistoryFile =
+ new Path(rootDirPath, appStart.getApplicationId().toString());
+ try {
+ hfWriter = new HistoryFileWriter(applicationHistoryFile);
+ LOG.info("Opened history file of application "
+ + appStart.getApplicationId());
+ } catch (IOException e) {
+ LOG.error("Error when openning history file of application "
+ + appStart.getApplicationId());
+ throw e;
+ }
+ outstandingWriters.put(appStart.getApplicationId(), hfWriter);
+ } else {
+ throw new IOException("History file of application "
+ + appStart.getApplicationId() + " is already opened");
+ }
+ assert appStart instanceof ApplicationStartDataPBImpl;
+ try {
+ hfWriter.writeHistoryData(new HistoryDataKey(appStart.getApplicationId()
+ .toString(), START_DATA_SUFFIX),
+ ((ApplicationStartDataPBImpl) appStart).getProto().toByteArray());
+ LOG.info("Start information of application "
+ + appStart.getApplicationId() + " is written");
+ } catch (IOException e) {
+ LOG.error("Error when writing start information of application "
+ + appStart.getApplicationId());
+ throw e;
+ }
+ }
+
+ @Override
+ public void applicationFinished(ApplicationFinishData appFinish)
+ throws IOException {
+ HistoryFileWriter hfWriter =
+ getHistoryFileWriter(appFinish.getApplicationId());
+ assert appFinish instanceof ApplicationFinishDataPBImpl;
+ try {
+ hfWriter.writeHistoryData(new HistoryDataKey(appFinish.getApplicationId()
+ .toString(), FINISH_DATA_SUFFIX),
+ ((ApplicationFinishDataPBImpl) appFinish).getProto().toByteArray());
+ LOG.info("Finish information of application "
+ + appFinish.getApplicationId() + " is written");
+ } catch (IOException e) {
+ LOG.error("Error when writing finish information of application "
+ + appFinish.getApplicationId());
+ throw e;
+ } finally {
+ hfWriter.close();
+ outstandingWriters.remove(appFinish.getApplicationId());
+ }
+ }
+
+ @Override
+ public void applicationAttemptStarted(
+ ApplicationAttemptStartData appAttemptStart) throws IOException {
+ HistoryFileWriter hfWriter =
+ getHistoryFileWriter(appAttemptStart.getApplicationAttemptId()
+ .getApplicationId());
+ assert appAttemptStart instanceof ApplicationAttemptStartDataPBImpl;
+ try {
+ hfWriter.writeHistoryData(new HistoryDataKey(appAttemptStart
+ .getApplicationAttemptId().toString(), START_DATA_SUFFIX),
+ ((ApplicationAttemptStartDataPBImpl) appAttemptStart).getProto()
+ .toByteArray());
+ LOG.info("Start information of application attempt "
+ + appAttemptStart.getApplicationAttemptId() + " is written");
+ } catch (IOException e) {
+ LOG.error("Error when writing start information of application attempt "
+ + appAttemptStart.getApplicationAttemptId());
+ throw e;
+ }
+ }
+
+ @Override
+ public void applicationAttemptFinished(
+ ApplicationAttemptFinishData appAttemptFinish) throws IOException {
+ HistoryFileWriter hfWriter =
+ getHistoryFileWriter(appAttemptFinish.getApplicationAttemptId()
+ .getApplicationId());
+ assert appAttemptFinish instanceof ApplicationAttemptFinishDataPBImpl;
+ try {
+ hfWriter.writeHistoryData(new HistoryDataKey(appAttemptFinish
+ .getApplicationAttemptId().toString(), FINISH_DATA_SUFFIX),
+ ((ApplicationAttemptFinishDataPBImpl) appAttemptFinish).getProto()
+ .toByteArray());
+ LOG.info("Finish information of application attempt "
+ + appAttemptFinish.getApplicationAttemptId() + " is written");
+ } catch (IOException e) {
+ LOG.error("Error when writing finish information of application attempt "
+ + appAttemptFinish.getApplicationAttemptId());
+ throw e;
+ }
+ }
+
+ @Override
+ public void containerStarted(ContainerStartData containerStart)
+ throws IOException {
+ HistoryFileWriter hfWriter =
+ getHistoryFileWriter(containerStart.getContainerId()
+ .getApplicationAttemptId().getApplicationId());
+ assert containerStart instanceof ContainerStartDataPBImpl;
+ try {
+ hfWriter.writeHistoryData(new HistoryDataKey(containerStart
+ .getContainerId().toString(), START_DATA_SUFFIX),
+ ((ContainerStartDataPBImpl) containerStart).getProto().toByteArray());
+ LOG.info("Start information of container "
+ + containerStart.getContainerId() + " is written");
+ } catch (IOException e) {
+ LOG.error("Error when writing start information of container "
+ + containerStart.getContainerId());
+ throw e;
+ }
+ }
+
+ @Override
+ public void containerFinished(ContainerFinishData containerFinish)
+ throws IOException {
+ HistoryFileWriter hfWriter =
+ getHistoryFileWriter(containerFinish.getContainerId()
+ .getApplicationAttemptId().getApplicationId());
+ assert containerFinish instanceof ContainerFinishDataPBImpl;
+ try {
+ hfWriter.writeHistoryData(new HistoryDataKey(containerFinish
+ .getContainerId().toString(), FINISH_DATA_SUFFIX),
+ ((ContainerFinishDataPBImpl) containerFinish).getProto().toByteArray());
+ LOG.info("Finish information of container "
+ + containerFinish.getContainerId() + " is written");
+ } catch (IOException e) {
+ LOG.error("Error when writing finish information of container "
+ + containerFinish.getContainerId());
+ }
+ }
+
+ private static ApplicationStartData parseApplicationStartData(byte[] value)
+ throws InvalidProtocolBufferException {
+ return new ApplicationStartDataPBImpl(
+ ApplicationStartDataProto.parseFrom(value));
+ }
+
+ private static ApplicationFinishData parseApplicationFinishData(byte[] value)
+ throws InvalidProtocolBufferException {
+ return new ApplicationFinishDataPBImpl(
+ ApplicationFinishDataProto.parseFrom(value));
+ }
+
+ private static ApplicationAttemptStartData parseApplicationAttemptStartData(
+ byte[] value) throws InvalidProtocolBufferException {
+ return new ApplicationAttemptStartDataPBImpl(
+ ApplicationAttemptStartDataProto.parseFrom(value));
+ }
+
+ private static ApplicationAttemptFinishData
+ parseApplicationAttemptFinishData(byte[] value)
+ throws InvalidProtocolBufferException {
+ return new ApplicationAttemptFinishDataPBImpl(
+ ApplicationAttemptFinishDataProto.parseFrom(value));
+ }
+
+ private static ContainerStartData parseContainerStartData(byte[] value)
+ throws InvalidProtocolBufferException {
+ return new ContainerStartDataPBImpl(
+ ContainerStartDataProto.parseFrom(value));
+ }
+
+ private static ContainerFinishData parseContainerFinishData(byte[] value)
+ throws InvalidProtocolBufferException {
+ return new ContainerFinishDataPBImpl(
+ ContainerFinishDataProto.parseFrom(value));
+ }
+
+ private static void mergeApplicationHistoryData(
+ ApplicationHistoryData historyData, ApplicationStartData startData) {
+ historyData.setApplicationName(startData.getApplicationName());
+ historyData.setApplicationType(startData.getApplicationType());
+ historyData.setQueue(startData.getQueue());
+ historyData.setUser(startData.getUser());
+ historyData.setSubmitTime(startData.getSubmitTime());
+ historyData.setStartTime(startData.getStartTime());
+ }
+
+ private static void mergeApplicationHistoryData(
+ ApplicationHistoryData historyData, ApplicationFinishData finishData) {
+ historyData.setFinishTime(finishData.getFinishTime());
+ historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
+ historyData.setFinalApplicationStatus(finishData
+ .getFinalApplicationStatus());
+ historyData.setYarnApplicationState(finishData.getYarnApplicationState());
+ }
+
+ private static void mergeApplicationAttemptHistoryData(
+ ApplicationAttemptHistoryData historyData,
+ ApplicationAttemptStartData startData) {
+ historyData.setHost(startData.getHost());
+ historyData.setRPCPort(startData.getRPCPort());
+ historyData.setMasterContainerId(startData.getMasterContainerId());
+ }
+
+ private static void mergeApplicationAttemptHistoryData(
+ ApplicationAttemptHistoryData historyData,
+ ApplicationAttemptFinishData finishData) {
+ historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
+ historyData.setTrackingURL(finishData.getTrackingURL());
+ historyData.setFinalApplicationStatus(finishData
+ .getFinalApplicationStatus());
+ historyData.setYarnApplicationAttemptState(finishData
+ .getYarnApplicationAttemptState());
+ }
+
+ private static void mergeContainerHistoryData(
+ ContainerHistoryData historyData, ContainerStartData startData) {
+ historyData.setAllocatedResource(startData.getAllocatedResource());
+ historyData.setAssignedNode(startData.getAssignedNode());
+ historyData.setPriority(startData.getPriority());
+ historyData.setStartTime(startData.getStartTime());
+ }
+
+ private static void mergeContainerHistoryData(
+ ContainerHistoryData historyData, ContainerFinishData finishData) {
+ historyData.setFinishTime(finishData.getFinishTime());
+ historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
+ historyData.setContainerExitStatus(finishData.getContainerExitStatus());
+ historyData.setContainerState(finishData.getContainerState());
+ }
+
+ private HistoryFileWriter getHistoryFileWriter(ApplicationId appId)
+ throws IOException {
+ HistoryFileWriter hfWriter = outstandingWriters.get(appId);
+ if (hfWriter == null) {
+ throw new IOException("History file of application " + appId
+ + " is not opened");
+ }
+ return hfWriter;
+ }
+
+ private HistoryFileReader getHistoryFileReader(ApplicationId appId)
+ throws IOException {
+ Path applicationHistoryFile = new Path(rootDirPath, appId.toString());
+ if (!fs.exists(applicationHistoryFile)) {
+ throw new IOException("History file for application " + appId
+ + " is not found");
+ }
+ // The history file is still under writing
+ if (outstandingWriters.containsKey(appId)) {
+ throw new IOException("History file for application " + appId
+ + " is under writing");
+ }
+ return new HistoryFileReader(applicationHistoryFile);
+ }
+
+ private class HistoryFileReader {
+
+ private class Entry {
+
+ private HistoryDataKey key;
+ private byte[] value;
+
+ public Entry(HistoryDataKey key, byte[] value) {
+ this.key = key;
+ this.value = value;
+ }
+ }
+
+ private TFile.Reader reader;
+ private TFile.Reader.Scanner scanner;
+
+ public HistoryFileReader(Path historyFile) throws IOException {
+ FSDataInputStream fsdis = fs.open(historyFile);
+ reader =
+ new TFile.Reader(fsdis, fs.getFileStatus(historyFile).getLen(),
+ getConfig());
+ reset();
+ }
+
+ public boolean hasNext() {
+ return !scanner.atEnd();
+ }
+
+ public Entry next() throws IOException {
+ TFile.Reader.Scanner.Entry entry = scanner.entry();
+ DataInputStream dis = entry.getKeyStream();
+ HistoryDataKey key = new HistoryDataKey();
+ key.readFields(dis);
+ dis = entry.getValueStream();
+ byte[] value = new byte[entry.getValueLength()];
+ dis.read(value);
+ scanner.advance();
+ return new Entry(key, value);
+ }
+
+ public void reset() throws IOException {
+ IOUtils.cleanup(LOG, scanner);
+ scanner = reader.createScanner();
+ }
+
+ public void close() {
+ IOUtils.cleanup(LOG, scanner, reader);
+ }
+
+ }
+
+ private class HistoryFileWriter {
+
+ private FSDataOutputStream fsdos;
+ private TFile.Writer writer;
+
+ public HistoryFileWriter(Path historyFile) throws IOException {
+ if (fs.exists(historyFile)) {
+ fsdos = fs.append(historyFile);
+ } else {
+ fsdos = fs.create(historyFile);
+ }
+ fs.setPermission(historyFile, HISTORY_FILE_UMASK);
+ writer =
+ new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get(
+ YarnConfiguration.FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE,
+ YarnConfiguration.DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE), null,
+ getConfig());
+ }
+
+ public synchronized void close() {
+ IOUtils.cleanup(LOG, writer, fsdos);
+ }
+
+ public synchronized void writeHistoryData(HistoryDataKey key, byte[] value)
+ throws IOException {
+ DataOutputStream dos = null;
+ try {
+ dos = writer.prepareAppendKey(-1);
+ key.write(dos);
+ } finally {
+ IOUtils.cleanup(LOG, dos);
+ }
+ try {
+ dos = writer.prepareAppendValue(value.length);
+ dos.write(value);
+ } finally {
+ IOUtils.cleanup(LOG, dos);
+ }
+ }
+
+ }
+
+ private static class HistoryDataKey implements Writable {
+
+ private String id;
+
+ private String suffix;
+
+ public HistoryDataKey() {
+ this(null, null);
+ }
+
+ public HistoryDataKey(String id, String suffix) {
+ this.id = id;
+ this.suffix = suffix;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(id);
+ out.writeUTF(suffix);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ id = in.readUTF();
+ suffix = in.readUTF();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java
new file mode 100644
index 0000000..c226ad3
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
+
+/**
+ * In-memory implementation of {@link ApplicationHistoryStore}. This
+ * implementation is for test purpose only. If users improperly instantiate it,
+ * they may encounter reading and writing history data in different memory
+ * store.
+ *
+ */
+@Private
+@Unstable
+public class MemoryApplicationHistoryStore extends AbstractService implements
+ ApplicationHistoryStore {
+
+ private final ConcurrentMap<ApplicationId, ApplicationHistoryData> applicationData =
+ new ConcurrentHashMap<ApplicationId, ApplicationHistoryData>();
+ private final ConcurrentMap<ApplicationId, ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData>> applicationAttemptData =
+ new ConcurrentHashMap<ApplicationId, ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData>>();
+ private final ConcurrentMap<ApplicationAttemptId, ConcurrentMap<ContainerId, ContainerHistoryData>> containerData =
+ new ConcurrentHashMap<ApplicationAttemptId, ConcurrentMap<ContainerId, ContainerHistoryData>>();
+
+ public MemoryApplicationHistoryStore() {
+ super(MemoryApplicationHistoryStore.class.getName());
+ }
+
+ @Override
+ public Map<ApplicationId, ApplicationHistoryData> getAllApplications() {
+ return new HashMap<ApplicationId, ApplicationHistoryData>(applicationData);
+ }
+
+ @Override
+ public ApplicationHistoryData getApplication(ApplicationId appId) {
+ return applicationData.get(appId);
+ }
+
+ @Override
+ public Map<ApplicationAttemptId, ApplicationAttemptHistoryData>
+ getApplicationAttempts(ApplicationId appId) {
+ ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap =
+ applicationAttemptData.get(appId);
+ if (subMap == null) {
+ return Collections
+ .<ApplicationAttemptId, ApplicationAttemptHistoryData> emptyMap();
+ } else {
+ return new HashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>(
+ subMap);
+ }
+ }
+
+ @Override
+ public ApplicationAttemptHistoryData getApplicationAttempt(
+ ApplicationAttemptId appAttemptId) {
+ ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap =
+ applicationAttemptData.get(appAttemptId.getApplicationId());
+ if (subMap == null) {
+ return null;
+ } else {
+ return subMap.get(appAttemptId);
+ }
+ }
+
+ @Override
+ public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId) {
+ ApplicationAttemptHistoryData appAttempt =
+ getApplicationAttempt(appAttemptId);
+ if (appAttempt == null || appAttempt.getMasterContainerId() == null) {
+ return null;
+ } else {
+ return getContainer(appAttempt.getMasterContainerId());
+ }
+ }
+
+ @Override
+ public ContainerHistoryData getContainer(ContainerId containerId) {
+ Map<ContainerId, ContainerHistoryData> subMap =
+ containerData.get(containerId.getApplicationAttemptId());
+ if (subMap == null) {
+ return null;
+ } else {
+ return subMap.get(containerId);
+ }
+ }
+
+ @Override
+ public Map<ContainerId, ContainerHistoryData> getContainers(
+ ApplicationAttemptId appAttemptId) throws IOException {
+ ConcurrentMap<ContainerId, ContainerHistoryData> subMap =
+ containerData.get(appAttemptId);
+ if (subMap == null) {
+ return Collections.<ContainerId, ContainerHistoryData> emptyMap();
+ } else {
+ return new HashMap<ContainerId, ContainerHistoryData>(subMap);
+ }
+ }
+
+ @Override
+ public void applicationStarted(ApplicationStartData appStart)
+ throws IOException {
+ ApplicationHistoryData oldData =
+ applicationData.putIfAbsent(appStart.getApplicationId(),
+ ApplicationHistoryData.newInstance(appStart.getApplicationId(),
+ appStart.getApplicationName(), appStart.getApplicationType(),
+ appStart.getQueue(), appStart.getUser(), appStart.getSubmitTime(),
+ appStart.getStartTime(), Long.MAX_VALUE, null, null, null));
+ if (oldData != null) {
+ throw new IOException("The start information of application "
+ + appStart.getApplicationId() + " is already stored.");
+ }
+ }
+
+ @Override
+ public void applicationFinished(ApplicationFinishData appFinish)
+ throws IOException {
+ ApplicationHistoryData data =
+ applicationData.get(appFinish.getApplicationId());
+ if (data == null) {
+ throw new IOException("The finish information of application "
+ + appFinish.getApplicationId() + " is stored before the start"
+ + " information.");
+ }
+ // Make the assumption that YarnApplicationState should not be null if
+ // the finish information is already recorded
+ if (data.getYarnApplicationState() != null) {
+ throw new IOException("The finish information of application "
+ + appFinish.getApplicationId() + " is already stored.");
+ }
+ data.setFinishTime(appFinish.getFinishTime());
+ data.setDiagnosticsInfo(appFinish.getDiagnosticsInfo());
+ data.setFinalApplicationStatus(appFinish.getFinalApplicationStatus());
+ data.setYarnApplicationState(appFinish.getYarnApplicationState());
+ }
+
+ @Override
+ public void applicationAttemptStarted(
+ ApplicationAttemptStartData appAttemptStart) throws IOException {
+ ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap =
+ getSubMap(appAttemptStart.getApplicationAttemptId().getApplicationId());
+ ApplicationAttemptHistoryData oldData =
+ subMap.putIfAbsent(appAttemptStart.getApplicationAttemptId(),
+ ApplicationAttemptHistoryData.newInstance(
+ appAttemptStart.getApplicationAttemptId(),
+ appAttemptStart.getHost(), appAttemptStart.getRPCPort(),
+ appAttemptStart.getMasterContainerId(), null, null, null, null));
+ if (oldData != null) {
+ throw new IOException("The start information of application attempt "
+ + appAttemptStart.getApplicationAttemptId() + " is already stored.");
+ }
+ }
+
+ @Override
+ public void applicationAttemptFinished(
+ ApplicationAttemptFinishData appAttemptFinish) throws IOException {
+ ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap =
+ getSubMap(appAttemptFinish.getApplicationAttemptId().getApplicationId());
+ ApplicationAttemptHistoryData data =
+ subMap.get(appAttemptFinish.getApplicationAttemptId());
+ if (data == null) {
+ throw new IOException("The finish information of application attempt "
+ + appAttemptFinish.getApplicationAttemptId() + " is stored before"
+ + " the start information.");
+ }
+ // Make the assumption that YarnApplicationAttemptState should not be null
+ // if the finish information is already recorded
+ if (data.getYarnApplicationAttemptState() != null) {
+ throw new IOException("The finish information of application attempt "
+ + appAttemptFinish.getApplicationAttemptId() + " is already stored.");
+ }
+ data.setTrackingURL(appAttemptFinish.getTrackingURL());
+ data.setDiagnosticsInfo(appAttemptFinish.getDiagnosticsInfo());
+ data
+ .setFinalApplicationStatus(appAttemptFinish.getFinalApplicationStatus());
+ data.setYarnApplicationAttemptState(appAttemptFinish
+ .getYarnApplicationAttemptState());
+ }
+
+ private ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData>
+ getSubMap(ApplicationId appId) {
+ applicationAttemptData
+ .putIfAbsent(
+ appId,
+ new ConcurrentHashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>());
+ return applicationAttemptData.get(appId);
+ }
+
+ @Override
+ public void containerStarted(ContainerStartData containerStart)
+ throws IOException {
+ ConcurrentMap<ContainerId, ContainerHistoryData> subMap =
+ getSubMap(containerStart.getContainerId().getApplicationAttemptId());
+ ContainerHistoryData oldData =
+ subMap.putIfAbsent(containerStart.getContainerId(),
+ ContainerHistoryData.newInstance(containerStart.getContainerId(),
+ containerStart.getAllocatedResource(),
+ containerStart.getAssignedNode(), containerStart.getPriority(),
+ containerStart.getStartTime(), Long.MAX_VALUE, null,
+ Integer.MAX_VALUE, null));
+ if (oldData != null) {
+ throw new IOException("The start information of container "
+ + containerStart.getContainerId() + " is already stored.");
+ }
+ }
+
+ @Override
+ public void containerFinished(ContainerFinishData containerFinish)
+ throws IOException {
+ ConcurrentMap<ContainerId, ContainerHistoryData> subMap =
+ getSubMap(containerFinish.getContainerId().getApplicationAttemptId());
+ ContainerHistoryData data = subMap.get(containerFinish.getContainerId());
+ if (data == null) {
+ throw new IOException("The finish information of container "
+ + containerFinish.getContainerId() + " is stored before"
+ + " the start information.");
+ }
+ // Make the assumption that ContainerState should not be null if
+ // the finish information is already recorded
+ if (data.getContainerState() != null) {
+ throw new IOException("The finish information of container "
+ + containerFinish.getContainerId() + " is already stored.");
+ }
+ data.setFinishTime(containerFinish.getFinishTime());
+ data.setDiagnosticsInfo(containerFinish.getDiagnosticsInfo());
+ data.setContainerExitStatus(containerFinish.getContainerExitStatus());
+ data.setContainerState(containerFinish.getContainerState());
+ }
+
+ private ConcurrentMap<ContainerId, ContainerHistoryData> getSubMap(
+ ApplicationAttemptId appAttemptId) {
+ containerData.putIfAbsent(appAttemptId,
+ new ConcurrentHashMap<ContainerId, ContainerHistoryData>());
+ return containerData.get(appAttemptId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java
new file mode 100644
index 0000000..3660c10
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
+
+/**
+ * Dummy implementation of {@link ApplicationHistoryStore}. If this
+ * implementation is used, no history data will be persisted.
+ *
+ */
+@Unstable
+@Private
+public class NullApplicationHistoryStore extends AbstractService implements
+ ApplicationHistoryStore {
+
+ public NullApplicationHistoryStore() {
+ super(NullApplicationHistoryStore.class.getName());
+ }
+
+ @Override
+ public void applicationStarted(ApplicationStartData appStart)
+ throws IOException {
+ }
+
+ @Override
+ public void applicationFinished(ApplicationFinishData appFinish)
+ throws IOException {
+ }
+
+ @Override
+ public void applicationAttemptStarted(
+ ApplicationAttemptStartData appAttemptStart) throws IOException {
+ }
+
+ @Override
+ public void applicationAttemptFinished(
+ ApplicationAttemptFinishData appAttemptFinish) throws IOException {
+ }
+
+ @Override
+ public void containerStarted(ContainerStartData containerStart)
+ throws IOException {
+ }
+
+ @Override
+ public void containerFinished(ContainerFinishData containerFinish)
+ throws IOException {
+ }
+
+ @Override
+ public ApplicationHistoryData getApplication(ApplicationId appId)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public Map<ApplicationId, ApplicationHistoryData> getAllApplications()
+ throws IOException {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public Map<ApplicationAttemptId, ApplicationAttemptHistoryData>
+ getApplicationAttempts(ApplicationId appId) throws IOException {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public ApplicationAttemptHistoryData getApplicationAttempt(
+ ApplicationAttemptId appAttemptId) throws IOException {
+ return null;
+ }
+
+ @Override
+ public ContainerHistoryData getContainer(ContainerId containerId)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public Map<ContainerId, ContainerHistoryData> getContainers(
+ ApplicationAttemptId appAttemptId) throws IOException {
+ return Collections.emptyMap();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
new file mode 100644
index 0000000..e702fc0
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.annotate.JsonSubTypes;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+
+public abstract class AbstractTimelineAggregator implements Runnable {
+ protected final PhoenixHBaseAccessor hBaseAccessor;
+ protected final String CHECKPOINT_LOCATION;
+ private final Log LOG;
+ static final long checkpointDelay = 120000;
+ static final Integer RESULTSET_FETCH_SIZE = 5000;
+ private static final ObjectMapper mapper;
+
+ static {
+ //SimpleModule simpleModule = new SimpleModule("MetricAggregator", new Version(1, 0, 0, null));
+ //simpleModule.addSerializer(new MetricAggregateSerializer());
+ mapper = new ObjectMapper();
+ //mapper.registerModule(simpleModule);
+ }
+
+ public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
+ String checkpointLocation) {
+ this.hBaseAccessor = hBaseAccessor;
+ this.CHECKPOINT_LOCATION = checkpointLocation;
+ this.LOG = LogFactory.getLog(this.getClass());
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Started Timeline aggregator thread @ " + new Date());
+ Long SLEEP_INTERVAL = getSleepInterval();
+
+ while (true) {
+ long currentTime = System.currentTimeMillis();
+ long lastCheckPointTime = -1;
+
+ try {
+ lastCheckPointTime = readCheckPoint();
+ if (isLastCheckPointTooOld(lastCheckPointTime)) {
+ LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " +
+ "lastCheckPointTime = " + lastCheckPointTime);
+ lastCheckPointTime = -1;
+ }
+ if (lastCheckPointTime == -1) {
+ // Assuming first run, save checkpoint and sleep.
+ // Set checkpoint to 2 minutes in the past to allow the
+ // agents/collectors to catch up
+ saveCheckPoint(currentTime - checkpointDelay);
+ }
+ } catch (IOException io) {
+ LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io);
+ }
+
+ if (lastCheckPointTime != -1) {
+ LOG.info("Last check point time: " + lastCheckPointTime + ", " +
+ "lagBy: " + ((System.currentTimeMillis() - lastCheckPointTime)) / 1000);
+ boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL);
+ if (success) {
+ try {
+ saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL);
+ } catch (IOException io) {
+ LOG.warn("Error saving checkpoint, restarting aggregation at " +
+ "previous checkpoint.");
+ }
+ }
+ }
+
+ try {
+ Thread.sleep(SLEEP_INTERVAL);
+ } catch (InterruptedException e) {
+ LOG.info("Sleep interrupted, continuing with aggregation.");
+ }
+ }
+ }
+
+ private boolean isLastCheckPointTooOld(long checkpoint) {
+ return checkpoint != -1 &&
+ ((System.currentTimeMillis() - checkpoint) > getCheckpointCutOffInterval());
+ }
+
+ private long readCheckPoint() {
+ try {
+ File checkpoint = new File(CHECKPOINT_LOCATION);
+ if (checkpoint.exists()) {
+ String contents = FileUtils.readFileToString(checkpoint);
+ if (contents != null && !contents.isEmpty()) {
+ return Long.parseLong(contents);
+ }
+ }
+ } catch (IOException io) {
+ LOG.debug(io);
+ }
+ return -1;
+ }
+
+ private void saveCheckPoint(long checkpointTime) throws IOException {
+ File checkpoint = new File(CHECKPOINT_LOCATION);
+ if (!checkpoint.exists()) {
+ boolean done = checkpoint.createNewFile();
+ if (!done) {
+ throw new IOException("Could not create checkpoint at location, " +
+ CHECKPOINT_LOCATION);
+ }
+ }
+ FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime));
+ }
+
+ // TODO: Abstract out doWork implementation for cluster and host levels
+ protected abstract boolean doWork(long startTime, long endTime);
+
+ protected abstract Long getSleepInterval();
+
+ protected abstract Long getCheckpointCutOffInterval();
+
+ @JsonSubTypes({ @JsonSubTypes.Type(value = MetricClusterAggregate.class),
+ @JsonSubTypes.Type(value = MetricHostAggregate.class) })
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ public static class MetricAggregate {
+ protected Double sum = 0.0;
+ protected Double deviation;
+ protected Double max = Double.MIN_VALUE;
+ protected Double min = Double.MAX_VALUE;
+
+ public MetricAggregate() {}
+
+ protected MetricAggregate(Double sum, Double deviation, Double max, Double min) {
+ this.sum = sum;
+ this.deviation = deviation;
+ this.max = max;
+ this.min = min;
+ }
+
+ void updateSum(Double sum) {
+ this.sum += sum;
+ }
+
+ void updateMax(Double max) {
+ if (max > this.max) {
+ this.max = max;
+ }
+ }
+
+ void updateMin(Double min) {
+ if (min < this.min) {
+ this.min = min;
+ }
+ }
+
+ @JsonProperty("sum")
+ Double getSum() {
+ return sum;
+ }
+
+ @JsonProperty("deviation")
+ Double getDeviation() {
+ return deviation;
+ }
+
+ @JsonProperty("max")
+ Double getMax() {
+ return max;
+ }
+
+ @JsonProperty("min")
+ Double getMin() {
+ return min;
+ }
+
+ public void setSum(Double sum) {
+ this.sum = sum;
+ }
+
+ public void setDeviation(Double deviation) {
+ this.deviation = deviation;
+ }
+
+ public void setMax(Double max) {
+ this.max = max;
+ }
+
+ public void setMin(Double min) {
+ this.min = min;
+ }
+
+ public String toJSON() throws IOException {
+ return mapper.writeValueAsString(this);
+ }
+ }
+
+ public static class MetricClusterAggregate extends MetricAggregate {
+ private int numberOfHosts;
+
+ @JsonCreator
+ public MetricClusterAggregate() {}
+
+ MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
+ Double max, Double min) {
+ super(sum, deviation, max, min);
+ this.numberOfHosts = numberOfHosts;
+ }
+
+ @JsonProperty("numberOfHosts")
+ int getNumberOfHosts() {
+ return numberOfHosts;
+ }
+
+ void updateNumberOfHosts(int count) {
+ this.numberOfHosts += count;
+ }
+
+ public void setNumberOfHosts(int numberOfHosts) {
+ this.numberOfHosts = numberOfHosts;
+ }
+
+ @Override
+ public String toString() {
+ return "MetricAggregate{" +
+ "sum=" + sum +
+ ", numberOfHosts=" + numberOfHosts +
+ ", deviation=" + deviation +
+ ", max=" + max +
+ ", min=" + min +
+ '}';
+ }
+ }
+
+ /**
+ * Represents a collection of minute based aggregation of values for
+ * resolution greater than a minute.
+ */
+ public static class MetricHostAggregate extends MetricAggregate {
+
+ @JsonCreator
+ public MetricHostAggregate() {
+ super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
+ }
+
+ /**
+ * Find and update min, max and avg for a minute
+ */
+ void updateAggregates(MetricHostAggregate hostAggregate) {
+ updateMax(hostAggregate.getMax());
+ updateMin(hostAggregate.getMin());
+ updateSum(hostAggregate.getSum());
+ }
+
+ /**
+ * Reuse sum to indicate average for a host for the hour
+ */
+ @Override
+ void updateSum(Double sum) {
+ this.sum = (this.sum + sum) / 2;
+ }
+
+ @Override
+ public String toString() {
+ return "MetricHostAggregate{" +
+ "sum=" + sum +
+ ", deviation=" + deviation +
+ ", max=" + max +
+ ", min=" + min +
+ '}';
+ }
+ }
+}