You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2019/03/13 04:33:29 UTC

[hadoop] branch trunk updated: YARN-9338 Timeline related testcases are failing. Contributed by Abhishek Modi.

This is an automated email from the ASF dual-hosted git repository.

vrushali pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 17a3e14  YARN-9338 Timeline related testcases are failing. Contributed by Abhishek Modi.
17a3e14 is described below

commit 17a3e14d25877af90ef6655750ce2b035c2982b5
Author: Vrushali C <vr...@apache.org>
AuthorDate: Tue Mar 12 21:33:17 2019 -0700

    YARN-9338 Timeline related testcases are failing. Contributed by Abhishek Modi.
---
 .../security/TestTimelineAuthFilterForV2.java      |   4 +
 .../storage/FileSystemTimelineWriterImpl.java      |  30 +++---
 .../storage/TestFileSystemTimelineWriterImpl.java  | 119 +++++++++++++++++++++
 3 files changed, 139 insertions(+), 14 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java
index c353cf0..95a008a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java
@@ -44,7 +44,9 @@ import java.util.concurrent.Callable;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.minikdc.MiniKdc;
@@ -144,6 +146,8 @@ public class TestTimelineAuthFilterForV2 {
     // Setup timeline service v2.
     try {
       conf = new Configuration(false);
+      conf.setClass("fs.file.impl", RawLocalFileSystem.class,
+          FileSystem.class);
       conf.setStrings(TimelineAuthenticationFilterInitializer.PREFIX + "type",
           "kerberos");
       conf.set(TimelineAuthenticationFilterInitializer.PREFIX +
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
index c284f8f..023d496 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -78,6 +79,7 @@ public class FileSystemTimelineWriterImpl extends AbstractService
   private int fsNumRetries;
   private long fsRetryInterval;
   private Path entitiesPath;
+  private Configuration config;
 
   /** default value for storage location on local disk. */
   private static final String STORAGE_DIR_ROOT = "timeline_service_data";
@@ -122,17 +124,13 @@ public class FileSystemTimelineWriterImpl extends AbstractService
                                           TimelineEntity entity,
                                           TimelineWriteResponse response)
                                           throws IOException {
-    Path clusterIdPath = new Path(entitiesPath, clusterId);
-    Path userIdPath = new Path(clusterIdPath, userId);
-    Path flowNamePath = new Path(userIdPath, escape(flowName));
-    Path flowVersionPath = new Path(flowNamePath, escape(flowVersion));
-    Path flowRunPath = new Path(flowVersionPath, String.valueOf(flowRun));
-    Path appIdPath = new Path(flowRunPath, appId);
-    Path entityTypePath = new Path(appIdPath, entity.getType());
+    String entityTypePathStr = clusterId + File.separator + userId +
+        File.separator + escape(flowName) + File.separator +
+        escape(flowVersion) + File.separator + flowRun + File.separator + appId
+        + File.separator + entity.getType();
+    Path entityTypePath = new Path(entitiesPath, entityTypePathStr);
     try {
-      mkdirs(rootPath, entitiesPath, clusterIdPath, userIdPath,
-              flowNamePath, flowVersionPath, flowRunPath, appIdPath,
-              entityTypePath);
+      mkdirs(entityTypePath);
       Path filePath =
               new Path(entityTypePath,
                       entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
@@ -181,7 +179,8 @@ public class FileSystemTimelineWriterImpl extends AbstractService
             DEFAULT_TIMELINE_FS_WRITER_NUM_RETRIES);
     fsRetryInterval = conf.getLong(TIMELINE_FS_WRITER_RETRY_INTERVAL_MS,
             DEFAULT_TIMELINE_FS_WRITER_RETRY_INTERVAL_MS);
-    fs = rootPath.getFileSystem(getConfig());
+    config = conf;
+    fs = rootPath.getFileSystem(config);
   }
 
   @Override
@@ -285,12 +284,15 @@ public class FileSystemTimelineWriterImpl extends AbstractService
     // final status.
     try {
       fsOut = fs.create(tempPath, true);
+      FSDataInputStream fsIn = fs.open(outputPath);
+      IOUtils.copyBytes(fsIn, fsOut, config, false);
+      fsIn.close();
+      fs.delete(outputPath, false);
       fsOut.write(data);
       fsOut.close();
-      fsOut = null;
       fs.rename(tempPath, outputPath);
-    } finally {
-      IOUtils.cleanupWithLogger(LOG, fsOut);
+    } catch (IOException ie) {
+      LOG.error("Got an exception while writing file", ie);
     }
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
index 4073b85..b880b9a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
@@ -146,6 +146,125 @@ public class TestFileSystemTimelineWriterImpl {
     }
   }
 
+  @Test
+  public void testWriteMultipleEntities() throws Exception {
+    String id = "appId";
+    String type = "app";
+
+    TimelineEntities te1 = new TimelineEntities();
+    TimelineEntity entity = new TimelineEntity();
+    entity.setId(id);
+    entity.setType(type);
+    entity.setCreatedTime(1425016501000L);
+    te1.addEntity(entity);
+
+    TimelineEntities te2 = new TimelineEntities();
+    TimelineEntity entity2 = new TimelineEntity();
+    entity2.setId(id);
+    entity2.setType(type);
+    entity2.setCreatedTime(1425016503000L);
+    te2.addEntity(entity2);
+
+    FileSystemTimelineWriterImpl fsi = null;
+    try {
+      fsi = new FileSystemTimelineWriterImpl();
+      Configuration conf = new YarnConfiguration();
+      String outputRoot = tmpFolder.newFolder().getAbsolutePath();
+      conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+          outputRoot);
+      fsi.init(conf);
+      fsi.start();
+      fsi.write(
+          new TimelineCollectorContext("cluster_id", "user_id", "flow_name",
+              "flow_version", 12345678L, "app_id"),
+          te1, UserGroupInformation.createRemoteUser("user_id"));
+      fsi.write(
+          new TimelineCollectorContext("cluster_id", "user_id", "flow_name",
+              "flow_version", 12345678L, "app_id"),
+          te2, UserGroupInformation.createRemoteUser("user_id"));
+
+      String fileName = outputRoot + File.separator + "entities" +
+          File.separator + "cluster_id" + File.separator + "user_id" +
+          File.separator + "flow_name" + File.separator + "flow_version" +
+          File.separator + "12345678" + File.separator + "app_id" +
+          File.separator + type + File.separator + id +
+          FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      Path path = new Path(fileName);
+      FileSystem fs = FileSystem.get(conf);
+      assertTrue("Specified path(" + fileName + ") should exist: ",
+          fs.exists(path));
+      FileStatus fileStatus = fs.getFileStatus(path);
+      assertTrue("Specified path should be a file",
+          !fileStatus.isDirectory());
+      List<String> data = readFromFile(fs, path);
+      assertTrue("data size is:" + data.size(), data.size() == 3);
+      String d = data.get(0);
+      // confirm the contents same as what was written
+      assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
+
+
+      String metricToString = data.get(1);
+      // confirm the contents same as what was written
+      assertEquals(metricToString,
+          TimelineUtils.dumpTimelineRecordtoJSON(entity2));
+    } finally {
+      if (fsi != null) {
+        fsi.close();
+      }
+    }
+  }
+
+  @Test
+  public void testWriteEntitiesWithEmptyFlowName() throws Exception {
+    String id = "appId";
+    String type = "app";
+
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entity = new TimelineEntity();
+    entity.setId(id);
+    entity.setType(type);
+    entity.setCreatedTime(1425016501000L);
+    te.addEntity(entity);
+
+    FileSystemTimelineWriterImpl fsi = null;
+    try {
+      fsi = new FileSystemTimelineWriterImpl();
+      Configuration conf = new YarnConfiguration();
+      String outputRoot = tmpFolder.newFolder().getAbsolutePath();
+      conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+          outputRoot);
+      fsi.init(conf);
+      fsi.start();
+      fsi.write(
+          new TimelineCollectorContext("cluster_id", "user_id", "",
+              "flow_version", 12345678L, "app_id"),
+          te, UserGroupInformation.createRemoteUser("user_id"));
+
+      String fileName = outputRoot + File.separator + "entities" +
+          File.separator + "cluster_id" + File.separator + "user_id" +
+          File.separator + "" + File.separator + "flow_version" +
+          File.separator + "12345678" + File.separator + "app_id" +
+          File.separator + type + File.separator + id +
+          FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      Path path = new Path(fileName);
+      FileSystem fs = FileSystem.get(conf);
+      assertTrue("Specified path(" + fileName + ") should exist: ",
+          fs.exists(path));
+      FileStatus fileStatus = fs.getFileStatus(path);
+      assertTrue("Specified path should be a file",
+          !fileStatus.isDirectory());
+      List<String> data = readFromFile(fs, path);
+      assertTrue("data size is:" + data.size(), data.size() == 2);
+      String d = data.get(0);
+      // confirm the contents same as what was written
+      assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
+    } finally {
+      if (fsi != null) {
+        fsi.close();
+      }
+    }
+  }
+
   private List<String> readFromFile(FileSystem fs, Path path)
           throws IOException {
     BufferedReader br = new BufferedReader(


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org