You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2019/02/27 22:55:44 UTC

[hadoop] branch trunk updated: YARN-3841 [atsv2 Storage implementation] Adding retry semantics to HDFS backing storage. Contributed by Abhishek Modi.

This is an automated email from the ASF dual-hosted git repository.

vrushali pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ea3cdc6  YARN-3841 [atsv2 Storage implementation] Adding retry semantics to HDFS backing storage. Contributed by Abhishek Modi.
ea3cdc6 is described below

commit ea3cdc60b39d96702c0bce292829914c25bc0d8e
Author: Vrushali C <vr...@apache.org>
AuthorDate: Wed Feb 27 14:55:35 2019 -0800

    YARN-3841 [atsv2 Storage implementation] Adding retry semantics to HDFS backing storage. Contributed by Abhishek Modi.
---
 .../storage/FileSystemTimelineWriterImpl.java      | 217 ++++++++++++++++-----
 .../storage/TestFileSystemTimelineWriterImpl.java  |  51 +++--
 2 files changed, 207 insertions(+), 61 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
index ac0902f..c284f8f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
@@ -18,16 +18,16 @@
 
 package org.apache.hadoop.yarn.server.timelineservice.storage;
 
-import java.io.BufferedWriter;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
@@ -35,14 +35,17 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse.TimelineWriteError;
+import org.apache.hadoop.yarn.client.api.impl.FileSystemTimelineWriter;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * This implements a local file based backend for storing application timeline
+ * This implements a FileSystem based backend for storing application timeline
  * information. This implementation may not provide a complete implementation of
  * all the necessary features. This implementation is provided solely for basic
  * testing purposes, and should not be used in a non-test situation.
@@ -52,20 +55,36 @@ import com.google.common.annotations.VisibleForTesting;
 public class FileSystemTimelineWriterImpl extends AbstractService
     implements TimelineWriter {
 
-  private String outputRoot;
-
   /** Config param for timeline service storage tmp root for FILE YARN-3264. */
-  public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT
-      = YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
+  public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
+
+  public static final String TIMELINE_FS_WRITER_NUM_RETRIES =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.num-retries";
+  public static final int DEFAULT_TIMELINE_FS_WRITER_NUM_RETRIES = 0;
+
+  public static final String TIMELINE_FS_WRITER_RETRY_INTERVAL_MS =
+       YarnConfiguration.TIMELINE_SERVICE_PREFIX +
+               "fs-writer.retry-interval-ms";
+  public static final long DEFAULT_TIMELINE_FS_WRITER_RETRY_INTERVAL_MS = 1000L;
 
   public static final String ENTITIES_DIR = "entities";
 
   /** Default extension for output files. */
   public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
 
+  private FileSystem fs;
+  private Path rootPath;
+  private int fsNumRetries;
+  private long fsRetryInterval;
+  private Path entitiesPath;
+
   /** default value for storage location on local disk. */
   private static final String STORAGE_DIR_ROOT = "timeline_service_data";
 
+  private static final Logger LOG =
+          LoggerFactory.getLogger(FileSystemTimelineWriter.class);
+
   FileSystemTimelineWriterImpl() {
     super((FileSystemTimelineWriterImpl.class.getName()));
   }
@@ -83,8 +102,8 @@ public class FileSystemTimelineWriterImpl extends AbstractService
     String appId = context.getAppId();
 
     for (TimelineEntity entity : entities.getEntities()) {
-      write(clusterId, userId, flowName, flowVersion, flowRunId, appId, entity,
-          response);
+      writeInternal(clusterId, userId, flowName, flowVersion,
+              flowRunId, appId, entity, response);
     }
     return response;
   }
@@ -97,59 +116,78 @@ public class FileSystemTimelineWriterImpl extends AbstractService
     return null;
   }
 
-  private synchronized void write(String clusterId, String userId,
-      String flowName, String flowVersion, long flowRun, String appId,
-      TimelineEntity entity, TimelineWriteResponse response)
-      throws IOException {
-    PrintWriter out = null;
+  private synchronized void writeInternal(String clusterId, String userId,
+                                          String flowName, String flowVersion,
+                                          long flowRun, String appId,
+                                          TimelineEntity entity,
+                                          TimelineWriteResponse response)
+                                          throws IOException {
+    Path clusterIdPath = new Path(entitiesPath, clusterId);
+    Path userIdPath = new Path(clusterIdPath, userId);
+    Path flowNamePath = new Path(userIdPath, escape(flowName));
+    Path flowVersionPath = new Path(flowNamePath, escape(flowVersion));
+    Path flowRunPath = new Path(flowVersionPath, String.valueOf(flowRun));
+    Path appIdPath = new Path(flowRunPath, appId);
+    Path entityTypePath = new Path(appIdPath, entity.getType());
     try {
-      String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId,
-          escape(flowName), escape(flowVersion), String.valueOf(flowRun), appId,
-          entity.getType());
-      String fileName = dir + entity.getId() +
-          TIMELINE_SERVICE_STORAGE_EXTENSION;
-      out =
-          new PrintWriter(new BufferedWriter(new OutputStreamWriter(
-              new FileOutputStream(fileName, true), "UTF-8")));
-      out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity));
-      out.write("\n");
-    } catch (IOException ioe) {
-      TimelineWriteError error = new TimelineWriteError();
-      error.setEntityId(entity.getId());
-      error.setEntityType(entity.getType());
+      mkdirs(rootPath, entitiesPath, clusterIdPath, userIdPath,
+              flowNamePath, flowVersionPath, flowRunPath, appIdPath,
+              entityTypePath);
+      Path filePath =
+              new Path(entityTypePath,
+                      entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
+      createFileWithRetries(filePath);
+
+      byte[] record =  new StringBuilder()
+              .append(TimelineUtils.dumpTimelineRecordtoJSON(entity))
+              .append("\n").toString().getBytes("UTF-8");
+      writeFileWithRetries(filePath, record);
+    } catch (Exception ioe) {
+      LOG.warn("Interrupted operation:" + ioe.getMessage());
+      TimelineWriteError error = createTimelineWriteError(entity);
       /*
        * TODO: set an appropriate error code after PoC could possibly be:
        * error.setErrorCode(TimelineWriteError.IO_EXCEPTION);
        */
       response.addError(error);
-    } finally {
-      if (out != null) {
-        out.close();
-      }
     }
   }
 
+  private TimelineWriteError createTimelineWriteError(TimelineEntity entity) {
+    TimelineWriteError error = new TimelineWriteError();
+    error.setEntityId(entity.getId());
+    error.setEntityType(entity.getType());
+    return error;
+  }
+
   @Override
   public TimelineWriteResponse aggregate(TimelineEntity data,
       TimelineAggregationTrack track) throws IOException {
     return null;
-
   }
 
   @VisibleForTesting
   String getOutputRoot() {
-    return outputRoot;
+    return rootPath.toString();
   }
 
   @Override
   public void serviceInit(Configuration conf) throws Exception {
-    outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+    String outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
         conf.get("hadoop.tmp.dir") + File.separator + STORAGE_DIR_ROOT);
+    rootPath = new Path(outputRoot);
+    entitiesPath = new Path(rootPath, ENTITIES_DIR);
+    fsNumRetries = conf.getInt(TIMELINE_FS_WRITER_NUM_RETRIES,
+            DEFAULT_TIMELINE_FS_WRITER_NUM_RETRIES);
+    fsRetryInterval = conf.getLong(TIMELINE_FS_WRITER_RETRY_INTERVAL_MS,
+            DEFAULT_TIMELINE_FS_WRITER_RETRY_INTERVAL_MS);
+    fs = rootPath.getFileSystem(getConfig());
   }
 
   @Override
   public void serviceStart() throws Exception {
-    mkdirs(outputRoot, ENTITIES_DIR);
+    mkdirsWithRetries(rootPath);
+    mkdirsWithRetries(entitiesPath);
   }
 
   @Override
@@ -157,18 +195,103 @@ public class FileSystemTimelineWriterImpl extends AbstractService
     // no op
   }
 
-  private static String mkdirs(String... dirStrs) throws IOException {
-    StringBuilder path = new StringBuilder();
-    for (String dirStr : dirStrs) {
-      path.append(dirStr).append(File.separatorChar);
-      File dir = new File(path.toString());
-      if (!dir.exists()) {
-        if (!dir.mkdirs()) {
-          throw new IOException("Could not create directories for " + dir);
+  private void mkdirs(Path... paths) throws IOException, InterruptedException {
+    for (Path path: paths) {
+      if (!existsWithRetries(path)) {
+        mkdirsWithRetries(path);
+      }
+    }
+  }
+
+  // Code from FSRMStateStore.
+  private void mkdirsWithRetries(final Path dirPath)
+          throws IOException, InterruptedException {
+    new FSAction<Void>() {
+      @Override
+      public Void run() throws IOException {
+        fs.mkdirs(dirPath);
+        return null;
+      }
+    }.runWithRetries();
+  }
+
+  private void writeFileWithRetries(final Path outputPath, final byte[] data)
+          throws Exception {
+    new FSAction<Void>() {
+      @Override
+      public Void run() throws IOException {
+        writeFile(outputPath, data);
+        return null;
+      }
+    }.runWithRetries();
+  }
+
+  private boolean createFileWithRetries(final Path newFile)
+          throws IOException, InterruptedException {
+    return new FSAction<Boolean>() {
+      @Override
+      public Boolean run() throws IOException {
+        return createFile(newFile);
+      }
+    }.runWithRetries();
+  }
+
+  private boolean existsWithRetries(final Path path)
+          throws IOException, InterruptedException {
+    return new FSAction<Boolean>() {
+      @Override
+      public Boolean run() throws IOException {
+        return fs.exists(path);
+      }
+    }.runWithRetries();
+  }
+
+  private abstract class FSAction<T> {
+    abstract T run() throws IOException;
+
+    T runWithRetries() throws IOException, InterruptedException {
+      int retry = 0;
+      while (true) {
+        try {
+          return run();
+        } catch (IOException e) {
+          LOG.info("Exception while executing a FS operation.", e);
+          if (++retry > fsNumRetries) {
+            LOG.info("Maxed out FS retries. Giving up!");
+            throw e;
+          }
+          LOG.info("Will retry operation on FS. Retry no. " + retry +
+              " after sleeping for " + fsRetryInterval + " seconds");
+          Thread.sleep(fsRetryInterval);
         }
       }
     }
-    return path.toString();
+  }
+
+  private boolean createFile(Path newFile) throws IOException {
+    return fs.createNewFile(newFile);
+  }
+
+  /**
+   * In order to make this writeInternal atomic as a part of writeInternal
+   * we will first writeInternal data to .tmp file and then rename it.
+   * Here we are assuming that rename is atomic for underlying file system.
+   */
+  protected void writeFile(Path outputPath, byte[] data) throws IOException {
+    Path tempPath =
+            new Path(outputPath.getParent(), outputPath.getName() + ".tmp");
+    FSDataOutputStream fsOut = null;
+    // This file will be overwritten when app/attempt finishes for saving the
+    // final status.
+    try {
+      fsOut = fs.create(tempPath, true);
+      fsOut.write(data);
+      fsOut.close();
+      fsOut = null;
+      fs.rename(tempPath, outputPath);
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, fsOut);
+    }
   }
 
   // specifically escape the separator character
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
index bb9f54f..4073b85 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
@@ -20,16 +20,19 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.io.BufferedReader;
 import java.io.File;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@@ -96,16 +99,20 @@ public class TestFileSystemTimelineWriterImpl {
               "flow_version", 12345678L, "app_id"),
           te, UserGroupInformation.createRemoteUser("user_id"));
 
-      String fileName = fsi.getOutputRoot() + File.separator + "entities" +
+      String fileName = outputRoot + File.separator + "entities" +
           File.separator + "cluster_id" + File.separator + "user_id" +
           File.separator + "flow_name" + File.separator + "flow_version" +
           File.separator + "12345678" + File.separator + "app_id" +
           File.separator + type + File.separator + id +
           FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
-      Path path = Paths.get(fileName);
-      File f = new File(fileName);
-      assertTrue(f.exists() && !f.isDirectory());
-      List<String> data = Files.readAllLines(path, StandardCharsets.UTF_8);
+      Path path = new Path(fileName);
+      FileSystem fs = FileSystem.get(conf);
+      assertTrue("Specified path(" + fileName + ") should exist: ",
+              fs.exists(path));
+      FileStatus fileStatus = fs.getFileStatus(path);
+      assertTrue("Specified path should be a file",
+              !fileStatus.isDirectory());
+      List<String> data = readFromFile(fs, path);
       // ensure there's only one entity + 1 new line
       assertTrue("data size is:" + data.size(), data.size() == 2);
       String d = data.get(0);
@@ -119,12 +126,15 @@ public class TestFileSystemTimelineWriterImpl {
           File.separator + "12345678" + File.separator + "app_id" +
           File.separator + type2 + File.separator + id2 +
           FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
-      Path path2 = Paths.get(fileName2);
-      File file = new File(fileName2);
-      assertTrue(file.exists() && !file.isDirectory());
-      List<String> data2 = Files.readAllLines(path2, StandardCharsets.UTF_8);
+      Path path2 = new Path(fileName2);
+      assertTrue("Specified path(" + fileName + ") should exist: ",
+              fs.exists(path2));
+      FileStatus fileStatus2 = fs.getFileStatus(path2);
+      assertTrue("Specified path should be a file",
+              !fileStatus2.isDirectory());
+      List<String> data2 = readFromFile(fs, path2);
       // ensure there's only one entity + 1 new line
-      assertTrue("data size is:" + data.size(), data2.size() == 2);
+      assertTrue("data size is:" + data2.size(), data2.size() == 2);
       String metricToString = data2.get(0);
       // confirm the contents same as what was written
       assertEquals(metricToString,
@@ -136,4 +146,17 @@ public class TestFileSystemTimelineWriterImpl {
     }
   }
 
+  private List<String> readFromFile(FileSystem fs, Path path)
+          throws IOException {
+    BufferedReader br = new BufferedReader(
+            new InputStreamReader(fs.open(path)));
+    List<String> data = new ArrayList<>();
+    String line = br.readLine();
+    data.add(line);
+    while(line != null) {
+      line = br.readLine();
+      data.add(line);
+    }
+    return data;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org