You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ro...@apache.org on 2019/09/05 10:09:54 UTC
[hadoop] 01/07: YARN-3879 [Storage implementation] Create HDFS
backing storage implementation for ATS reads. Contributed by Abhishek Modi.
This is an automated email from the ASF dual-hosted git repository.
rohithsharmaks pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 66e1599761ed1fa80d564f06b4b90815f64fe93b
Author: Vrushali C <vr...@apache.org>
AuthorDate: Thu Oct 11 21:13:52 2018 -0700
YARN-3879 [Storage implementation] Create HDFS backing storage implementation for ATS reads. Contributed by Abhishek Modi.
---
.../storage/FileSystemTimelineReaderImpl.java | 108 +++++++++++----------
1 file changed, 55 insertions(+), 53 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
index cea70af..d1b7e86 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.BufferedReader;
import java.io.File;
-import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
@@ -41,6 +40,11 @@ import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@@ -69,7 +73,9 @@ public class FileSystemTimelineReaderImpl extends AbstractService
private static final Logger LOG =
LoggerFactory.getLogger(FileSystemTimelineReaderImpl.class);
- private String rootPath;
+ private FileSystem fs;
+ private Path rootPath;
+ private Path entitiesPath;
private static final String ENTITIES_DIR = "entities";
/** Default extension for output files. */
@@ -95,7 +101,7 @@ public class FileSystemTimelineReaderImpl extends AbstractService
@VisibleForTesting
String getRootPath() {
- return rootPath;
+ return rootPath.toString();
}
private static ObjectMapper mapper;
@@ -163,12 +169,12 @@ public class FileSystemTimelineReaderImpl extends AbstractService
if (clusterId == null || appId == null) {
throw new IOException("Unable to get flow info");
}
- String appFlowMappingFile = rootPath + File.separator + ENTITIES_DIR +
- File.separator + clusterId + File.separator + APP_FLOW_MAPPING_FILE;
+ Path clusterIdPath = new Path(entitiesPath, clusterId);
+ Path appFlowMappingFilePath = new Path(clusterIdPath,
+ APP_FLOW_MAPPING_FILE);
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(
- new FileInputStream(
- appFlowMappingFile), Charset.forName("UTF-8")));
+ fs.open(appFlowMappingFilePath), Charset.forName("UTF-8")));
CSVParser parser = new CSVParser(reader, csvFormat)) {
for (CSVRecord record : parser.getRecords()) {
if (record.size() < 4) {
@@ -267,7 +273,7 @@ public class FileSystemTimelineReaderImpl extends AbstractService
return entity;
}
- private Set<TimelineEntity> getEntities(File dir, String entityType,
+ private Set<TimelineEntity> getEntities(Path dir, String entityType,
TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
throws IOException {
// First sort the selected entities based on created/start time.
@@ -281,15 +287,18 @@ public class FileSystemTimelineReaderImpl extends AbstractService
}
);
if (dir != null) {
- File[] files = dir.listFiles();
- if (files != null) {
- for (File entityFile : files) {
+ RemoteIterator<LocatedFileStatus> fileStatuses = fs.listFiles(dir,
+ false);
+ if (fileStatuses != null) {
+ while (fileStatuses.hasNext()) {
+ LocatedFileStatus locatedFileStatus = fileStatuses.next();
+ Path entityFile = locatedFileStatus.getPath();
if (!entityFile.getName()
.contains(TIMELINE_SERVICE_STORAGE_EXTENSION)) {
continue;
}
try (BufferedReader reader = new BufferedReader(
- new InputStreamReader(new FileInputStream(entityFile),
+ new InputStreamReader(fs.open(entityFile),
Charset.forName("UTF-8")))) {
TimelineEntity entity = readEntityFromFile(reader);
if (!entity.getType().equals(entityType)) {
@@ -367,25 +376,30 @@ public class FileSystemTimelineReaderImpl extends AbstractService
@Override
public void serviceInit(Configuration conf) throws Exception {
- rootPath = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+ String outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
conf.get("hadoop.tmp.dir") + File.separator + STORAGE_DIR_ROOT);
+ rootPath = new Path(outputRoot);
+ entitiesPath = new Path(rootPath, ENTITIES_DIR);
+ fs = rootPath.getFileSystem(conf);
super.serviceInit(conf);
}
@Override
public TimelineEntity getEntity(TimelineReaderContext context,
TimelineDataToRetrieve dataToRetrieve) throws IOException {
- String flowRunPath = getFlowRunPath(context.getUserId(),
+ String flowRunPathStr = getFlowRunPath(context.getUserId(),
context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
context.getAppId());
- File dir = new File(new File(rootPath, ENTITIES_DIR),
- context.getClusterId() + File.separator + flowRunPath + File.separator +
- context.getAppId() + File.separator + context.getEntityType());
- File entityFile = new File(
- dir, context.getEntityId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
+ Path clusterIdPath = new Path(entitiesPath, context.getClusterId());
+ Path flowRunPath = new Path(clusterIdPath, flowRunPathStr);
+ Path appIdPath = new Path(flowRunPath, context.getAppId());
+ Path entityTypePath = new Path(appIdPath, context.getEntityType());
+ Path entityFilePath = new Path(entityTypePath,
+ context.getEntityId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
+
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(
- new FileInputStream(entityFile), Charset.forName("UTF-8")))) {
+ fs.open(entityFilePath), Charset.forName("UTF-8")))) {
TimelineEntity entity = readEntityFromFile(reader);
return createEntityToBeReturned(
entity, dataToRetrieve.getFieldsToRetrieve());
@@ -400,56 +414,44 @@ public class FileSystemTimelineReaderImpl extends AbstractService
public Set<TimelineEntity> getEntities(TimelineReaderContext context,
TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
throws IOException {
- String flowRunPath = getFlowRunPath(context.getUserId(),
+ String flowRunPathStr = getFlowRunPath(context.getUserId(),
context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
context.getAppId());
- File dir =
- new File(new File(rootPath, ENTITIES_DIR),
- context.getClusterId() + File.separator + flowRunPath +
- File.separator + context.getAppId() + File.separator +
- context.getEntityType());
- return getEntities(dir, context.getEntityType(), filters, dataToRetrieve);
+ Path clusterIdPath = new Path(entitiesPath, context.getClusterId());
+ Path flowRunPath = new Path(clusterIdPath, flowRunPathStr);
+ Path appIdPath = new Path(flowRunPath, context.getAppId());
+ Path entityTypePath = new Path(appIdPath, context.getEntityType());
+
+ return getEntities(entityTypePath, context.getEntityType(), filters,
+ dataToRetrieve);
}
@Override public Set<String> getEntityTypes(TimelineReaderContext context)
throws IOException {
Set<String> result = new TreeSet<>();
- String flowRunPath = getFlowRunPath(context.getUserId(),
+ String flowRunPathStr = getFlowRunPath(context.getUserId(),
context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
context.getAppId());
- File dir = new File(new File(rootPath, ENTITIES_DIR),
- context.getClusterId() + File.separator + flowRunPath
- + File.separator + context.getAppId());
- File[] fileList = dir.listFiles();
- if (fileList != null) {
- for (File f : fileList) {
- if (f.isDirectory()) {
- result.add(f.getName());
- }
+ Path clusterIdPath = new Path(entitiesPath, context.getClusterId());
+ Path flowRunPath = new Path(clusterIdPath, flowRunPathStr);
+ Path appIdPath = new Path(flowRunPath, context.getAppId());
+ FileStatus[] fileStatuses = fs.listStatus(appIdPath);
+ for (FileStatus fileStatus : fileStatuses) {
+ if (fileStatus.isDirectory()) {
+ result.add(fileStatus.getPath().getName());
}
}
return result;
}
- @Override
- public TimelineHealth getHealthStatus() {
+ @Override public TimelineHealth getHealthStatus() {
try {
- File file = new File(rootPath);
- if (file.exists()) {
- return new TimelineHealth(TimelineHealth.TimelineHealthStatus.RUNNING,
- "");
- } else {
- return new TimelineHealth(
- TimelineHealth.TimelineHealthStatus.READER_CONNECTION_FAILURE,
- "Root path \"" + rootPath + "\" does not exist"
- );
- }
- } catch (Exception e) {
+ fs.exists(rootPath);
+ } catch (IOException e) {
return new TimelineHealth(
TimelineHealth.TimelineHealthStatus.READER_CONNECTION_FAILURE,
- e.getMessage()
- );
+ e.getMessage());
}
-
+ return new TimelineHealth(TimelineHealth.TimelineHealthStatus.RUNNING, "");
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org