You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/03/06 00:04:34 UTC
[27/27] hadoop git commit: YARN-3264. Created backing storage write
interface and a POC only FS based storage implementation. Contributed by
Vrushali C.
YARN-3264. Created backing storage write interface and a POC only FS based storage implementation. Contributed by Vrushali C.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/821b68d0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/821b68d0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/821b68d0
Branch: refs/heads/YARN-2928
Commit: 821b68d05d246fd57d7b7286eb2ccc075ed1eae8
Parents: 848acd5
Author: Zhijie Shen <zj...@apache.org>
Authored: Thu Mar 5 15:03:30 2015 -0800
Committer: Zhijie Shen <zj...@apache.org>
Committed: Thu Mar 5 15:03:30 2015 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../timelineservice/TimelineWriteResponse.java | 170 +++++++++++++++++++
.../hadoop/yarn/conf/YarnConfiguration.java | 2 +
.../distributedshell/TestDistributedShell.java | 56 +++++-
.../aggregator/TimelineAggregator.java | 43 +++--
.../storage/FileSystemTimelineWriterImpl.java | 144 ++++++++++++++++
.../storage/TimelineAggregationTrack.java | 28 +++
.../timelineservice/storage/TimelineWriter.java | 66 +++++++
.../aggregator/TestTimelineAggregator.java | 23 ---
.../TestFileSystemTimelineWriterImpl.java | 79 +++++++++
10 files changed, 574 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b68d0/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index f5a60f7..df1061d 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -26,6 +26,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-3210. Refactored timeline aggregator according to new code
organization proposed in YARN-3166. (Li Lu via zjshen)
+ YARN-3264. Created backing storage write interface and a POC only FS based
+ storage implementation. (Vrushali C via zjshen)
+
IMPROVEMENTS
OPTIMIZATIONS
http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b68d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java
new file mode 100644
index 0000000..82ecdbd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A class that holds a list of put errors. This is the response returned when a
+ * list of {@link TimelineEntity} objects is added to the timeline. If there are errors
+ * in storing individual entity objects, they will be indicated in the list of
+ * errors.
+ */
+@XmlRootElement(name = "response")
+@XmlAccessorType(XmlAccessType.NONE)
+@Public
+@Unstable
+public class TimelineWriteResponse {
+
+ private List<TimelineWriteError> errors = new ArrayList<TimelineWriteError>();
+
+ public TimelineWriteResponse() {
+
+ }
+
+ /**
+ * Get a list of {@link TimelineWriteError} instances
+ *
+ * @return a list of {@link TimelineWriteError} instances
+ */
+ @XmlElement(name = "errors")
+ public List<TimelineWriteError> getErrors() {
+ return errors;
+ }
+
+ /**
+ * Add a single {@link TimelineWriteError} instance into the existing list
+ *
+ * @param error
+ * a single {@link TimelineWriteError} instance
+ */
+ public void addError(TimelineWriteError error) {
+ errors.add(error);
+ }
+
+ /**
+ * Add a list of {@link TimelineWriteError} instances into the existing list
+ *
+ * @param errors
+ * a list of {@link TimelineWriteError} instances
+ */
+ public void addErrors(List<TimelineWriteError> errors) {
+ this.errors.addAll(errors);
+ }
+
+ /**
+ * Set the list to the given list of {@link TimelineWriteError} instances
+ *
+ * @param errors
+ * a list of {@link TimelineWriteError} instances
+ */
+ public void setErrors(List<TimelineWriteError> errors) {
+ this.errors.clear();
+ this.errors.addAll(errors);
+ }
+
+ /**
+ * A class that holds the error code for one entity.
+ */
+ @XmlRootElement(name = "error")
+ @XmlAccessorType(XmlAccessType.NONE)
+ @Public
+ @Unstable
+ public static class TimelineWriteError {
+
+ /**
+ * Error code returned if an IOException is encountered when storing an
+ * entity.
+ */
+ public static final int IO_EXCEPTION = 1;
+
+ private String entityId;
+ private String entityType;
+ private int errorCode;
+
+ /**
+ * Get the entity Id
+ *
+ * @return the entity Id
+ */
+ @XmlElement(name = "entity")
+ public String getEntityId() {
+ return entityId;
+ }
+
+ /**
+ * Set the entity Id
+ *
+ * @param entityId
+ * the entity Id
+ */
+ public void setEntityId(String entityId) {
+ this.entityId = entityId;
+ }
+
+ /**
+ * Get the entity type
+ *
+ * @return the entity type
+ */
+ @XmlElement(name = "entitytype")
+ public String getEntityType() {
+ return entityType;
+ }
+
+ /**
+ * Set the entity type
+ *
+ * @param entityType
+ * the entity type
+ */
+ public void setEntityType(String entityType) {
+ this.entityType = entityType;
+ }
+
+ /**
+ * Get the error code
+ *
+ * @return an error code
+ */
+ @XmlElement(name = "errorcode")
+ public int getErrorCode() {
+ return errorCode;
+ }
+
+ /**
+ * Set the error code to the given error code
+ *
+ * @param errorCode
+ * an error code
+ */
+ public void setErrorCode(int errorCode) {
+ this.errorCode = errorCode;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b68d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 25b808e..57fc378 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1253,6 +1253,8 @@ public class YarnConfiguration extends Configuration {
public static final String TIMELINE_SERVICE_PREFIX =
YARN_PREFIX + "timeline-service.";
+ public static final String TIMELINE_SERVICE_WRITER_CLASS =
+ TIMELINE_SERVICE_PREFIX + "writer.class";
// mark app-history related configs @Private as application history is going
// to be integrated into the timeline service
http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b68d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index 313dc97..ef69e4b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
@@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeTimelineAggregatorsAuxService;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -237,6 +239,7 @@ public class TestDistributedShell {
boolean verified = false;
String errorMessage = "";
+ ApplicationId appId = null;
while(!verified) {
List<ApplicationReport> apps = yarnClient.getApplications();
if (apps.size() == 0 ) {
@@ -244,6 +247,7 @@ public class TestDistributedShell {
continue;
}
ApplicationReport appReport = apps.get(0);
+ appId = appReport.getApplicationId();
if(appReport.getHost().equals("N/A")) {
Thread.sleep(10);
continue;
@@ -267,7 +271,7 @@ public class TestDistributedShell {
if (!isTestingTimelineV2) {
checkTimelineV1(haveDomain);
} else {
- checkTimelineV2(haveDomain);
+ checkTimelineV2(haveDomain, appId);
}
}
@@ -316,8 +320,54 @@ public class TestDistributedShell {
}
}
- private void checkTimelineV2(boolean haveDomain) {
- // TODO check timeline V2 here after we have a storage layer
+ private void checkTimelineV2(boolean haveDomain, ApplicationId appId) {
+ // For PoC check in /tmp/ YARN-3264
+ String tmpRoot = FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT;
+
+ File tmpRootFolder = new File(tmpRoot);
+ Assert.assertTrue(tmpRootFolder.isDirectory());
+
+ // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
+ String outputDirApp = tmpRoot + "/DS_APP_ATTEMPT/";
+
+ File entityFolder = new File(outputDirApp);
+ Assert.assertTrue(entityFolder.isDirectory());
+
+ // there will be at least one attempt, look for that file
+ String appTimestampFileName = "appattempt_" + appId.getClusterTimestamp()
+ + "_000" + appId.getId() + "_000001"
+ + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ String appAttemptFileName = outputDirApp + appTimestampFileName;
+ File appAttemptFile = new File(appAttemptFileName);
+ Assert.assertTrue(appAttemptFile.exists());
+
+ String outputDirContainer = tmpRoot + "/DS_CONTAINER/";
+ File containerFolder = new File(outputDirContainer);
+ Assert.assertTrue(containerFolder.isDirectory());
+
+ String containerTimestampFileName = "container_"
+ + appId.getClusterTimestamp() + "_000" + appId.getId()
+ + "_01_000002.thist";
+ String containerFileName = outputDirContainer + containerTimestampFileName;
+ File containerFile = new File(containerFileName);
+ Assert.assertTrue(containerFile.exists());
+ String appTimeStamp = appId.getClusterTimestamp() + "_" + appId.getId()
+ + "_";
+ deleteAppFiles(new File(outputDirApp), appTimeStamp);
+ deleteAppFiles(new File(outputDirContainer), appTimeStamp);
+ tmpRootFolder.delete();
+ }
+
+ private void deleteAppFiles(File rootDir, String appTimeStamp) {
+ boolean deleted = false;
+ File[] listOfFiles = rootDir.listFiles();
+ for (File f1 : listOfFiles) {
+ // list all attempts for this app and delete them
+ if (f1.getName().contains(appTimeStamp)){
+ deleted = f1.delete();
+ Assert.assertTrue(deleted);
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b68d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
index 4227712..dbd0895 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+import java.io.IOException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -26,12 +28,16 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* Service that handles writes to the timeline service and writes them to the
* backing storage.
*
- * Classes that extend this can putIfAbsent their own lifecycle management or
+ * Classes that extend this can add their own lifecycle management or
* customization of request handling.
*/
@Private
@@ -39,6 +45,8 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
public abstract class TimelineAggregator extends CompositeService {
private static final Log LOG = LogFactory.getLog(TimelineAggregator.class);
+ private TimelineWriter writer;
+
public TimelineAggregator(String name) {
super(name);
}
@@ -46,6 +54,11 @@ public abstract class TimelineAggregator extends CompositeService {
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
+ writer = ReflectionUtils.newInstance(conf.getClass(
+ YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+ FileSystemTimelineWriterImpl.class,
+ TimelineWriter.class), conf);
+ writer.init(conf);
}
@Override
@@ -56,6 +69,11 @@ public abstract class TimelineAggregator extends CompositeService {
@Override
protected void serviceStop() throws Exception {
super.serviceStop();
+ writer.stop();
+ }
+
+ public TimelineWriter getWriter() {
+ return writer;
}
/**
@@ -69,20 +87,17 @@ public abstract class TimelineAggregator extends CompositeService {
*
* @param entities entities to post
* @param callerUgi the caller UGI
+ * @return the response that contains the result of the post.
*/
- public void postEntities(TimelineEntities entities,
- UserGroupInformation callerUgi) {
- // Add this output temporarily for our prototype
- // TODO remove this after we have an actual implementation
- LOG.info("SUCCESS - TIMELINE V2 PROTOTYPE");
- LOG.info("postEntities(entities=" + entities + ", callerUgi=" +
- callerUgi + ")");
-
- // TODO implement
+ public TimelineWriteResponse postEntities(TimelineEntities entities,
+ UserGroupInformation callerUgi) throws IOException {
if (LOG.isDebugEnabled()) {
- LOG.debug("postEntities(entities=" + entities + ", callerUgi=" +
- callerUgi + ")");
+ LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE");
+ LOG.debug("postEntities(entities=" + entities + ", callerUgi="
+ + callerUgi + ")");
}
+
+ return writer.write(entities);
}
/**
@@ -104,4 +119,4 @@ public abstract class TimelineAggregator extends CompositeService {
callerUgi + ")");
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b68d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
new file mode 100644
index 0000000..4a57e97
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse.TimelineWriteError;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+/**
+ * This implements a local file based backend for storing application timeline
+ * information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class FileSystemTimelineWriterImpl extends AbstractService
+ implements TimelineWriter {
+
+ private String outputRoot;
+
+ /** Config param for timeline service storage tmp root for FILE YARN-3264 */
+ public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT
+ = YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
+
+ /** default value for storage location on local disk */
+ public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
+ = "/tmp/timeline_service_data/";
+
+ /** Default extension for output files */
+ public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
+
+ FileSystemTimelineWriterImpl() {
+ super((FileSystemTimelineWriterImpl.class.getName()));
+ }
+
+ /**
+ * Stores the entire information in {@link TimelineEntity} to the
+ * timeline store. Any errors occurring for individual write request objects
+ * will be reported in the response.
+ *
+ * @param data
+ * a {@link TimelineEntity} object
+ * @return {@link TimelineWriteResponse} object.
+ * @throws IOException
+ */
+ @Override
+ public TimelineWriteResponse write(TimelineEntities entities)
+ throws IOException {
+ TimelineWriteResponse response = new TimelineWriteResponse();
+ for (TimelineEntity entity : entities.getEntities()) {
+ write(entity, response);
+ }
+ return response;
+ }
+
+ private void write(TimelineEntity entity,
+ TimelineWriteResponse response) throws IOException {
+ PrintWriter out = null;
+ try {
+ File outputDir = new File(outputRoot + entity.getType());
+ String fileName = outputDir + "/" + entity.getId()
+ + TIMELINE_SERVICE_STORAGE_EXTENSION;
+ if (!outputDir.exists()) {
+ if (!outputDir.mkdirs()) {
+ throw new IOException("Could not create directories for " + fileName);
+ }
+ }
+ out = new PrintWriter(new BufferedWriter(new FileWriter(fileName, true)));
+ out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity));
+ out.write("\n");
+ } catch (IOException ioe) {
+ TimelineWriteError error = new TimelineWriteError();
+ error.setEntityId(entity.getId());
+ error.setEntityType(entity.getType());
+ /*
+ * TODO: set an appropriate error code after PoC could possibly be:
+ * error.setErrorCode(TimelineWriteError.IO_EXCEPTION);
+ */
+ response.addError(error);
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+
+ /**
+ * Aggregates the entity information to the timeline store based on which
+ * track this entity is to be rolled up to The tracks along which aggregations
+ * are to be done are given by {@link TimelineAggregationTrack}
+ *
+ * Any errors occurring for individual write request objects will be reported
+ * in the response.
+ *
+ * @param data
+ * a {@link TimelineEntity} object
+ * a {@link TimelineAggregationTrack} enum value
+ * @return a {@link TimelineWriteResponse} object.
+ * @throws IOException
+ */
+ public TimelineWriteResponse aggregate(TimelineEntity data,
+ TimelineAggregationTrack track) throws IOException {
+ return null;
+
+ }
+
+ public String getOutputRoot() {
+ return outputRoot;
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+ DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b68d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java
new file mode 100644
index 0000000..955ca80
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+/**
+ * specifies the tracks along which an entity
+ * info is to be aggregated on
+ *
+ */
+public enum TimelineAggregationTrack {
+ FLOW, USER, QUEUE
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b68d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
new file mode 100644
index 0000000..71ad7ab
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.service.Service;
+
+/**
+ * This interface is for storing application timeline information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface TimelineWriter extends Service {
+
+ /**
+ * Stores the entire information in {@link TimelineEntities} to the
+ * timeline store. Any errors occurring for individual write request objects
+ * will be reported in the response.
+ *
+ * @param data
+ * a {@link TimelineEntities} object.
+ * @return a {@link TimelineWriteResponse} object.
+ * @throws IOException
+ */
+ TimelineWriteResponse write(TimelineEntities data) throws IOException;
+
+ /**
+ * Aggregates the entity information to the timeline store based on which
+ * track this entity is to be rolled up to The tracks along which aggregations
+ * are to be done are given by {@link TimelineAggregationTrack}
+ *
+ * Any errors occurring for individual write request objects will be reported
+ * in the response.
+ *
+ * @param data
+ * a {@link TimelineEntity} object
+ * a {@link TimelineAggregationTrack} enum
+ * value.
+ * @return a {@link TimelineWriteResponse} object.
+ * @throws IOException
+ */
+ TimelineWriteResponse aggregate(TimelineEntity data,
+ TimelineAggregationTrack track) throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b68d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java
deleted file mode 100644
index 821e455..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-public class TestTimelineAggregator {
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b68d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
new file mode 100644
index 0000000..f720454
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.junit.Test;
+import org.apache.commons.io.FileUtils;
+
+public class TestFileSystemTimelineWriterImpl {
+
+ /**
+ * Unit test for PoC YARN 3264
+ * @throws Exception
+ */
+ @Test
+ public void testWriteEntityToFile() throws Exception {
+ String name = "unit_test_BaseAggregator_testWriteEntityToFile_"
+ + Long.toString(System.currentTimeMillis());
+
+ TimelineEntities te = new TimelineEntities();
+ TimelineEntity entity = new TimelineEntity();
+ String id = "hello";
+ String type = "world";
+ entity.setId(id);
+ entity.setType(type);
+ entity.setCreatedTime(1425016501000L);
+ entity.setModifiedTime(1425016502000L);
+ te.addEntity(entity);
+
+ FileSystemTimelineWriterImpl fsi = new FileSystemTimelineWriterImpl();
+ fsi.serviceInit(new Configuration());
+ fsi.write(te);
+
+ String fileName = fsi.getOutputRoot() + "/" + type + "/" + id
+ + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ Path path = Paths.get(fileName);
+ File f = new File(fileName);
+ assertTrue(f.exists() && !f.isDirectory());
+ List<String> data = Files.readAllLines(path, StandardCharsets.UTF_8);
+ // ensure there's only one entity + 1 new line
+ assertTrue(data.size() == 2);
+ String d = data.get(0);
+ // confirm the contents same as what was written
+ assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
+
+ // delete the directory
+ File outputDir = new File(fsi.getOutputRoot());
+ FileUtils.deleteDirectory(outputDir);
+ assertTrue(!(f.exists()));
+ }
+}