You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2019/02/27 22:55:44 UTC
[hadoop] branch trunk updated: YARN-3841 [atsv2 Storage
implementation] Adding retry semantics to HDFS backing storage. Contributed
by Abhishek Modi.
This is an automated email from the ASF dual-hosted git repository.
vrushali pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new ea3cdc6 YARN-3841 [atsv2 Storage implementation] Adding retry semantics to HDFS backing storage. Contributed by Abhishek Modi.
ea3cdc6 is described below
commit ea3cdc60b39d96702c0bce292829914c25bc0d8e
Author: Vrushali C <vr...@apache.org>
AuthorDate: Wed Feb 27 14:55:35 2019 -0800
YARN-3841 [atsv2 Storage implementation] Adding retry semantics to HDFS backing storage. Contributed by Abhishek Modi.
---
.../storage/FileSystemTimelineWriterImpl.java | 217 ++++++++++++++++-----
.../storage/TestFileSystemTimelineWriterImpl.java | 51 +++--
2 files changed, 207 insertions(+), 61 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
index ac0902f..c284f8f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
@@ -18,16 +18,16 @@
package org.apache.hadoop.yarn.server.timelineservice.storage;
-import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
@@ -35,14 +35,17 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse.TimelineWriteError;
+import org.apache.hadoop.yarn.client.api.impl.FileSystemTimelineWriter;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * This implements a local file based backend for storing application timeline
+ * This implements a FileSystem based backend for storing application timeline
* information. This implementation may not provide a complete implementation of
* all the necessary features. This implementation is provided solely for basic
* testing purposes, and should not be used in a non-test situation.
@@ -52,20 +55,36 @@ import com.google.common.annotations.VisibleForTesting;
public class FileSystemTimelineWriterImpl extends AbstractService
implements TimelineWriter {
- private String outputRoot;
-
/** Config param for timeline service storage tmp root for FILE YARN-3264. */
- public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT
- = YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
+ public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT =
+ YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
+
+ public static final String TIMELINE_FS_WRITER_NUM_RETRIES =
+ YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.num-retries";
+ public static final int DEFAULT_TIMELINE_FS_WRITER_NUM_RETRIES = 0;
+
+ public static final String TIMELINE_FS_WRITER_RETRY_INTERVAL_MS =
+ YarnConfiguration.TIMELINE_SERVICE_PREFIX +
+ "fs-writer.retry-interval-ms";
+ public static final long DEFAULT_TIMELINE_FS_WRITER_RETRY_INTERVAL_MS = 1000L;
public static final String ENTITIES_DIR = "entities";
/** Default extension for output files. */
public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
+ private FileSystem fs;
+ private Path rootPath;
+ private int fsNumRetries;
+ private long fsRetryInterval;
+ private Path entitiesPath;
+
/** default value for storage location on local disk. */
private static final String STORAGE_DIR_ROOT = "timeline_service_data";
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FileSystemTimelineWriter.class);
+
FileSystemTimelineWriterImpl() {
super((FileSystemTimelineWriterImpl.class.getName()));
}
@@ -83,8 +102,8 @@ public class FileSystemTimelineWriterImpl extends AbstractService
String appId = context.getAppId();
for (TimelineEntity entity : entities.getEntities()) {
- write(clusterId, userId, flowName, flowVersion, flowRunId, appId, entity,
- response);
+ writeInternal(clusterId, userId, flowName, flowVersion,
+ flowRunId, appId, entity, response);
}
return response;
}
@@ -97,59 +116,78 @@ public class FileSystemTimelineWriterImpl extends AbstractService
return null;
}
- private synchronized void write(String clusterId, String userId,
- String flowName, String flowVersion, long flowRun, String appId,
- TimelineEntity entity, TimelineWriteResponse response)
- throws IOException {
- PrintWriter out = null;
+ private synchronized void writeInternal(String clusterId, String userId,
+ String flowName, String flowVersion,
+ long flowRun, String appId,
+ TimelineEntity entity,
+ TimelineWriteResponse response)
+ throws IOException {
+ Path clusterIdPath = new Path(entitiesPath, clusterId);
+ Path userIdPath = new Path(clusterIdPath, userId);
+ Path flowNamePath = new Path(userIdPath, escape(flowName));
+ Path flowVersionPath = new Path(flowNamePath, escape(flowVersion));
+ Path flowRunPath = new Path(flowVersionPath, String.valueOf(flowRun));
+ Path appIdPath = new Path(flowRunPath, appId);
+ Path entityTypePath = new Path(appIdPath, entity.getType());
try {
- String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId,
- escape(flowName), escape(flowVersion), String.valueOf(flowRun), appId,
- entity.getType());
- String fileName = dir + entity.getId() +
- TIMELINE_SERVICE_STORAGE_EXTENSION;
- out =
- new PrintWriter(new BufferedWriter(new OutputStreamWriter(
- new FileOutputStream(fileName, true), "UTF-8")));
- out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity));
- out.write("\n");
- } catch (IOException ioe) {
- TimelineWriteError error = new TimelineWriteError();
- error.setEntityId(entity.getId());
- error.setEntityType(entity.getType());
+ mkdirs(rootPath, entitiesPath, clusterIdPath, userIdPath,
+ flowNamePath, flowVersionPath, flowRunPath, appIdPath,
+ entityTypePath);
+ Path filePath =
+ new Path(entityTypePath,
+ entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
+ createFileWithRetries(filePath);
+
+ byte[] record = new StringBuilder()
+ .append(TimelineUtils.dumpTimelineRecordtoJSON(entity))
+ .append("\n").toString().getBytes("UTF-8");
+ writeFileWithRetries(filePath, record);
+ } catch (Exception ioe) {
+ LOG.warn("Interrupted operation:" + ioe.getMessage());
+ TimelineWriteError error = createTimelineWriteError(entity);
/*
* TODO: set an appropriate error code after PoC could possibly be:
* error.setErrorCode(TimelineWriteError.IO_EXCEPTION);
*/
response.addError(error);
- } finally {
- if (out != null) {
- out.close();
- }
}
}
+ private TimelineWriteError createTimelineWriteError(TimelineEntity entity) {
+ TimelineWriteError error = new TimelineWriteError();
+ error.setEntityId(entity.getId());
+ error.setEntityType(entity.getType());
+ return error;
+ }
+
@Override
public TimelineWriteResponse aggregate(TimelineEntity data,
TimelineAggregationTrack track) throws IOException {
return null;
-
}
@VisibleForTesting
String getOutputRoot() {
- return outputRoot;
+ return rootPath.toString();
}
@Override
public void serviceInit(Configuration conf) throws Exception {
- outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+ String outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
conf.get("hadoop.tmp.dir") + File.separator + STORAGE_DIR_ROOT);
+ rootPath = new Path(outputRoot);
+ entitiesPath = new Path(rootPath, ENTITIES_DIR);
+ fsNumRetries = conf.getInt(TIMELINE_FS_WRITER_NUM_RETRIES,
+ DEFAULT_TIMELINE_FS_WRITER_NUM_RETRIES);
+ fsRetryInterval = conf.getLong(TIMELINE_FS_WRITER_RETRY_INTERVAL_MS,
+ DEFAULT_TIMELINE_FS_WRITER_RETRY_INTERVAL_MS);
+ fs = rootPath.getFileSystem(getConfig());
}
@Override
public void serviceStart() throws Exception {
- mkdirs(outputRoot, ENTITIES_DIR);
+ mkdirsWithRetries(rootPath);
+ mkdirsWithRetries(entitiesPath);
}
@Override
@@ -157,18 +195,103 @@ public class FileSystemTimelineWriterImpl extends AbstractService
// no op
}
- private static String mkdirs(String... dirStrs) throws IOException {
- StringBuilder path = new StringBuilder();
- for (String dirStr : dirStrs) {
- path.append(dirStr).append(File.separatorChar);
- File dir = new File(path.toString());
- if (!dir.exists()) {
- if (!dir.mkdirs()) {
- throw new IOException("Could not create directories for " + dir);
+ private void mkdirs(Path... paths) throws IOException, InterruptedException {
+ for (Path path: paths) {
+ if (!existsWithRetries(path)) {
+ mkdirsWithRetries(path);
+ }
+ }
+ }
+
+ // Code from FSRMStateStore.
+ private void mkdirsWithRetries(final Path dirPath)
+ throws IOException, InterruptedException {
+ new FSAction<Void>() {
+ @Override
+ public Void run() throws IOException {
+ fs.mkdirs(dirPath);
+ return null;
+ }
+ }.runWithRetries();
+ }
+
+ private void writeFileWithRetries(final Path outputPath, final byte[] data)
+ throws Exception {
+ new FSAction<Void>() {
+ @Override
+ public Void run() throws IOException {
+ writeFile(outputPath, data);
+ return null;
+ }
+ }.runWithRetries();
+ }
+
+ private boolean createFileWithRetries(final Path newFile)
+ throws IOException, InterruptedException {
+ return new FSAction<Boolean>() {
+ @Override
+ public Boolean run() throws IOException {
+ return createFile(newFile);
+ }
+ }.runWithRetries();
+ }
+
+ private boolean existsWithRetries(final Path path)
+ throws IOException, InterruptedException {
+ return new FSAction<Boolean>() {
+ @Override
+ public Boolean run() throws IOException {
+ return fs.exists(path);
+ }
+ }.runWithRetries();
+ }
+
+ private abstract class FSAction<T> {
+ abstract T run() throws IOException;
+
+ T runWithRetries() throws IOException, InterruptedException {
+ int retry = 0;
+ while (true) {
+ try {
+ return run();
+ } catch (IOException e) {
+ LOG.info("Exception while executing a FS operation.", e);
+ if (++retry > fsNumRetries) {
+ LOG.info("Maxed out FS retries. Giving up!");
+ throw e;
+ }
+ LOG.info("Will retry operation on FS. Retry no. " + retry +
+ " after sleeping for " + fsRetryInterval + " seconds");
+ Thread.sleep(fsRetryInterval);
}
}
}
- return path.toString();
+ }
+
+ private boolean createFile(Path newFile) throws IOException {
+ return fs.createNewFile(newFile);
+ }
+
+ /**
+ * In order to make this writeInternal atomic as a part of writeInternal
+ * we will first writeInternal data to .tmp file and then rename it.
+ * Here we are assuming that rename is atomic for underlying file system.
+ */
+ protected void writeFile(Path outputPath, byte[] data) throws IOException {
+ Path tempPath =
+ new Path(outputPath.getParent(), outputPath.getName() + ".tmp");
+ FSDataOutputStream fsOut = null;
+ // This file will be overwritten when app/attempt finishes for saving the
+ // final status.
+ try {
+ fsOut = fs.create(tempPath, true);
+ fsOut.write(data);
+ fsOut.close();
+ fsOut = null;
+ fs.rename(tempPath, outputPath);
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, fsOut);
+ }
}
// specifically escape the separator character
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
index bb9f54f..4073b85 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
@@ -20,16 +20,19 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.io.BufferedReader;
import java.io.File;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@@ -96,16 +99,20 @@ public class TestFileSystemTimelineWriterImpl {
"flow_version", 12345678L, "app_id"),
te, UserGroupInformation.createRemoteUser("user_id"));
- String fileName = fsi.getOutputRoot() + File.separator + "entities" +
+ String fileName = outputRoot + File.separator + "entities" +
File.separator + "cluster_id" + File.separator + "user_id" +
File.separator + "flow_name" + File.separator + "flow_version" +
File.separator + "12345678" + File.separator + "app_id" +
File.separator + type + File.separator + id +
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
- Path path = Paths.get(fileName);
- File f = new File(fileName);
- assertTrue(f.exists() && !f.isDirectory());
- List<String> data = Files.readAllLines(path, StandardCharsets.UTF_8);
+ Path path = new Path(fileName);
+ FileSystem fs = FileSystem.get(conf);
+ assertTrue("Specified path(" + fileName + ") should exist: ",
+ fs.exists(path));
+ FileStatus fileStatus = fs.getFileStatus(path);
+ assertTrue("Specified path should be a file",
+ !fileStatus.isDirectory());
+ List<String> data = readFromFile(fs, path);
// ensure there's only one entity + 1 new line
assertTrue("data size is:" + data.size(), data.size() == 2);
String d = data.get(0);
@@ -119,12 +126,15 @@ public class TestFileSystemTimelineWriterImpl {
File.separator + "12345678" + File.separator + "app_id" +
File.separator + type2 + File.separator + id2 +
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
- Path path2 = Paths.get(fileName2);
- File file = new File(fileName2);
- assertTrue(file.exists() && !file.isDirectory());
- List<String> data2 = Files.readAllLines(path2, StandardCharsets.UTF_8);
+ Path path2 = new Path(fileName2);
+ assertTrue("Specified path(" + fileName + ") should exist: ",
+ fs.exists(path2));
+ FileStatus fileStatus2 = fs.getFileStatus(path2);
+ assertTrue("Specified path should be a file",
+ !fileStatus2.isDirectory());
+ List<String> data2 = readFromFile(fs, path2);
// ensure there's only one entity + 1 new line
- assertTrue("data size is:" + data.size(), data2.size() == 2);
+ assertTrue("data size is:" + data2.size(), data2.size() == 2);
String metricToString = data2.get(0);
// confirm the contents same as what was written
assertEquals(metricToString,
@@ -136,4 +146,17 @@ public class TestFileSystemTimelineWriterImpl {
}
}
+ private List<String> readFromFile(FileSystem fs, Path path)
+ throws IOException {
+ BufferedReader br = new BufferedReader(
+ new InputStreamReader(fs.open(path)));
+ List<String> data = new ArrayList<>();
+ String line = br.readLine();
+ data.add(line);
+ while(line != null) {
+ line = br.readLine();
+ data.add(line);
+ }
+ return data;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org