You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by si...@apache.org on 2021/08/03 05:59:22 UTC

[atlas] 07/11: ATLAS-4306: Support for User-specific Spool Directory

This is an automated email from the ASF dual-hosted git repository.

sidmishra pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 215eafb057ae11ee7103ba1f446db5f143757ec9
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Mon May 31 21:34:44 2021 -0700

    ATLAS-4306: Support for User-specific Spool Directory
    
    (cherry picked from commit 36678ab1f331eb717578c3b2ed6677544ac3aa2a)
---
 .../atlas/notification/spool/AtlasFileSpool.java   |  4 ++-
 .../atlas/notification/spool/IndexManagement.java  |  6 ++---
 .../notification/spool/SpoolConfiguration.java     | 22 +++++++++++++---
 .../atlas/notification/spool/SpoolUtils.java       | 30 +++++++++++++++++++++-
 .../apache/atlas/notification/spool/BaseTest.java  |  2 +-
 5 files changed, 54 insertions(+), 10 deletions(-)

diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
index ea31284..0c92c30 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
@@ -41,6 +41,7 @@ public class AtlasFileSpool implements NotificationInterface {
     private final Publisher            publisher;
     private       Thread               publisherThread;
     private       Boolean              initDone = null;
+    private       String               currentUser;
 
     public AtlasFileSpool(Configuration configuration, AbstractNotification notificationHandler) {
         this.notificationHandler = notificationHandler;
@@ -56,7 +57,7 @@ public class AtlasFileSpool implements NotificationInterface {
 
         if (!isInitDone()) {
             try {
-                config.setSource(source);
+                config.setSource(source, this.currentUser);
 
                 LOG.info("{}: Initialization: Starting...", this.config.getSourceName());
 
@@ -86,6 +87,7 @@ public class AtlasFileSpool implements NotificationInterface {
     @Override
     public void setCurrentUser(String user) {
         this.notificationHandler.setCurrentUser(user);
+        this.currentUser = user;
     }
 
     @Override
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
index b3a586b..28f9c70 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
@@ -54,14 +54,14 @@ public class IndexManagement {
     public void init() throws IOException, AtlasException {
         String sourceName = config.getSourceName();
 
-        File spoolDir = SpoolUtils.getCreateDirectory(config.getSpoolDir());
-
+        File spoolDir = SpoolUtils.getCreateDirectoryWithPermissionCheck(config.getSpoolDir(), config.getUser());
         if (spoolDir == null) {
             throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, spoolDir.getAbsolutePath()));
         }
 
-        File archiveDir = SpoolUtils.getCreateDirectory(config.getArchiveDir());
+        config.setSpoolDir(spoolDir.getAbsolutePath());
 
+        File archiveDir = SpoolUtils.getCreateDirectory(config.getArchiveDir());
         if (archiveDir == null) {
             throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, archiveDir.getAbsolutePath()));
         }
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
index 76f05ef..36ea7be 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
@@ -40,19 +40,23 @@ public class SpoolConfiguration {
     public  static final String PROP_FILE_SPOOL_PAUSE_BEFORE_SEND_SEC           = PROPERTY_PREFIX_SPOOL + "pause.before.send.sec";
     private static final String PROP_HIVE_METASTORE_NAME                        = PROPERTY_PREFIX_SPOOL + "hivemetastore.name";
 
+    private final Configuration config;
+
     private final String messageHandlerName;
     private final int    maxArchivedFilesCount;
     private final int    messageBatchSize;
     private final int    retryDestinationMS;
     private final int    fileRollOverSec;
     private final int    fileSpoolMaxFilesCount;
-    private final String spoolDirPath;
-    private final String archiveDir;
+    private       String spoolDirPath;
+    private       String archiveDir;
     private final int    pauseBeforeSendSec;
     private final String hiveMetaStoreName;
     private       String sourceName;
+     private      String user;
 
     public SpoolConfiguration(Configuration cfg, String messageHandlerName) {
+        this.config = cfg;
         this.messageHandlerName     = messageHandlerName;
         this.maxArchivedFilesCount  = cfg.getInt(PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT);
         this.messageBatchSize       = cfg.getInt(PROP_MESSAGE_BATCH_SIZE, PROP_FILE_MESSAGE_BATCH_SIZE_DEFAULT);
@@ -65,8 +69,9 @@ public class SpoolConfiguration {
         this.hiveMetaStoreName      = cfg.getString(PROP_HIVE_METASTORE_NAME, PROP_HIVE_METASTORE_NAME_DEFAULT);
     }
 
-    public void setSource(String val) {
-        this.sourceName = val;
+    public void setSource(String source, String user) {
+        this.sourceName = source;
+        this.user       = user;
     }
 
     public String getSourceName() {
@@ -97,7 +102,12 @@ public class SpoolConfiguration {
         return new File(getSpoolDirPath());
     }
 
+    public void setSpoolDir(String absolutePath) {
+        this.spoolDirPath = absolutePath;
+    }
+
     public File getArchiveDir() {
+        this.archiveDir = config.getString(PROP_FILE_SPOOL_ARCHIVE_DIR, new File(getSpoolDirPath(), PROP_FILE_SPOOL_ARCHIVE_DIR_DEFAULT).toString());
         return new File(this.archiveDir);
     }
 
@@ -136,4 +146,8 @@ public class SpoolConfiguration {
     public boolean isHiveMetaStore() {
         return this.sourceName.equals(this.hiveMetaStoreName);
     }
+
+    public String getUser() {
+        return this.user;
+    }
 }
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java
index abbe33d..9ee4c80 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java
@@ -37,6 +37,7 @@ import java.text.SimpleDateFormat;
 public class SpoolUtils {
     private static final Logger LOG = LoggerFactory.getLogger(SpoolUtils.class);
 
+    private static final String           USER_SPECIFIC_PATH_NAME_FORMAT = "%s-%s";
     public  static final String           DEFAULT_CHAR_SET              = "UTF-8";
     private static final String           DEFAULT_LINE_SEPARATOR        = System.getProperty("line.separator");
     private static final String           FILE_EXT_JSON                 = ".json";
@@ -72,6 +73,33 @@ public class SpoolUtils {
         return ret;
     }
 
+    public static File getCreateDirectoryWithPermissionCheck(File file, String user) {
+        File ret = getCreateDirectory(file);
+
+        LOG.info("SpoolUtils.getCreateDirectory({}): Checking permissions...");
+        if (!file.canWrite() || !file.canRead()) {
+            File fileWithUserSuffix = getFileWithUserSuffix(file, user);
+            LOG.error("SpoolUtils.getCreateDirectory({}, {}): Insufficient permissions for user: {}! Will create: {}",
+                    file.getAbsolutePath(), user, user, fileWithUserSuffix);
+            ret = getCreateDirectory(fileWithUserSuffix);
+        }
+
+        return ret;
+    }
+
+    private static File getFileWithUserSuffix(File file, String user) {
+        if (!file.isDirectory()) {
+            return file;
+        }
+
+        String absolutePath = file.getAbsolutePath();
+        if (absolutePath.endsWith(File.pathSeparator)) {
+            absolutePath = StringUtils.removeEnd(absolutePath, File.pathSeparator);
+        }
+
+        return new File(String.format(USER_SPECIFIC_PATH_NAME_FORMAT, absolutePath, user));
+    }
+
     public static File getCreateDirectory(File file) {
         File ret = file;
 
@@ -79,7 +107,7 @@ public class SpoolUtils {
             boolean result = file.mkdirs();
 
             if (!file.isDirectory() || !result) {
-                LOG.error("SpoolUtils.getCreateDirectory({}): inaccessible!", file.toString());
+                LOG.error("SpoolUtils.getCreateDirectory({}): cannot be created!", file);
 
                 ret = null;
             }
diff --git a/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java b/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java
index 304c821..83971f6 100644
--- a/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java
@@ -50,7 +50,7 @@ public class BaseTest {
 
     public SpoolConfiguration getSpoolConfiguration(String spoolDir, String handlerName) {
         SpoolConfiguration cfg = new SpoolConfiguration(getConfiguration(spoolDir), handlerName);
-        cfg.setSource(SOURCE_TEST);
+        cfg.setSource(SOURCE_TEST, "testuser");
         return cfg;
     }