You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by si...@apache.org on 2021/08/03 05:59:22 UTC
[atlas] 07/11: ATLAS-4306: Support for User-specific Spool Directory
This is an automated email from the ASF dual-hosted git repository.
sidmishra pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 215eafb057ae11ee7103ba1f446db5f143757ec9
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Mon May 31 21:34:44 2021 -0700
ATLAS-4306: Support for User-specific Spool Directory
(cherry picked from commit 36678ab1f331eb717578c3b2ed6677544ac3aa2a)
---
.../atlas/notification/spool/AtlasFileSpool.java | 4 ++-
.../atlas/notification/spool/IndexManagement.java | 6 ++---
.../notification/spool/SpoolConfiguration.java | 22 +++++++++++++---
.../atlas/notification/spool/SpoolUtils.java | 30 +++++++++++++++++++++-
.../apache/atlas/notification/spool/BaseTest.java | 2 +-
5 files changed, 54 insertions(+), 10 deletions(-)
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
index ea31284..0c92c30 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
@@ -41,6 +41,7 @@ public class AtlasFileSpool implements NotificationInterface {
private final Publisher publisher;
private Thread publisherThread;
private Boolean initDone = null;
+ private String currentUser;
public AtlasFileSpool(Configuration configuration, AbstractNotification notificationHandler) {
this.notificationHandler = notificationHandler;
@@ -56,7 +57,7 @@ public class AtlasFileSpool implements NotificationInterface {
if (!isInitDone()) {
try {
- config.setSource(source);
+ config.setSource(source, this.currentUser);
LOG.info("{}: Initialization: Starting...", this.config.getSourceName());
@@ -86,6 +87,7 @@ public class AtlasFileSpool implements NotificationInterface {
@Override
public void setCurrentUser(String user) {
this.notificationHandler.setCurrentUser(user);
+ this.currentUser = user;
}
@Override
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
index b3a586b..28f9c70 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
@@ -54,14 +54,14 @@ public class IndexManagement {
public void init() throws IOException, AtlasException {
String sourceName = config.getSourceName();
- File spoolDir = SpoolUtils.getCreateDirectory(config.getSpoolDir());
-
+ File spoolDir = SpoolUtils.getCreateDirectoryWithPermissionCheck(config.getSpoolDir(), config.getUser());
if (spoolDir == null) {
throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, spoolDir.getAbsolutePath()));
}
- File archiveDir = SpoolUtils.getCreateDirectory(config.getArchiveDir());
+ config.setSpoolDir(spoolDir.getAbsolutePath());
+ File archiveDir = SpoolUtils.getCreateDirectory(config.getArchiveDir());
if (archiveDir == null) {
throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, archiveDir.getAbsolutePath()));
}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
index 76f05ef..36ea7be 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
@@ -40,19 +40,23 @@ public class SpoolConfiguration {
public static final String PROP_FILE_SPOOL_PAUSE_BEFORE_SEND_SEC = PROPERTY_PREFIX_SPOOL + "pause.before.send.sec";
private static final String PROP_HIVE_METASTORE_NAME = PROPERTY_PREFIX_SPOOL + "hivemetastore.name";
+ private final Configuration config;
+
private final String messageHandlerName;
private final int maxArchivedFilesCount;
private final int messageBatchSize;
private final int retryDestinationMS;
private final int fileRollOverSec;
private final int fileSpoolMaxFilesCount;
- private final String spoolDirPath;
- private final String archiveDir;
+ private String spoolDirPath;
+ private String archiveDir;
private final int pauseBeforeSendSec;
private final String hiveMetaStoreName;
private String sourceName;
+ private String user;
public SpoolConfiguration(Configuration cfg, String messageHandlerName) {
+ this.config = cfg;
this.messageHandlerName = messageHandlerName;
this.maxArchivedFilesCount = cfg.getInt(PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT);
this.messageBatchSize = cfg.getInt(PROP_MESSAGE_BATCH_SIZE, PROP_FILE_MESSAGE_BATCH_SIZE_DEFAULT);
@@ -65,8 +69,9 @@ public class SpoolConfiguration {
this.hiveMetaStoreName = cfg.getString(PROP_HIVE_METASTORE_NAME, PROP_HIVE_METASTORE_NAME_DEFAULT);
}
- public void setSource(String val) {
- this.sourceName = val;
+ public void setSource(String source, String user) {
+ this.sourceName = source;
+ this.user = user;
}
public String getSourceName() {
@@ -97,7 +102,12 @@ public class SpoolConfiguration {
return new File(getSpoolDirPath());
}
+ public void setSpoolDir(String absolutePath) {
+ this.spoolDirPath = absolutePath;
+ }
+
public File getArchiveDir() {
+ this.archiveDir = config.getString(PROP_FILE_SPOOL_ARCHIVE_DIR, new File(getSpoolDirPath(), PROP_FILE_SPOOL_ARCHIVE_DIR_DEFAULT).toString());
return new File(this.archiveDir);
}
@@ -136,4 +146,8 @@ public class SpoolConfiguration {
public boolean isHiveMetaStore() {
return this.sourceName.equals(this.hiveMetaStoreName);
}
+
+ public String getUser() {
+ return this.user;
+ }
}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java
index abbe33d..9ee4c80 100644
--- a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java
@@ -37,6 +37,7 @@ import java.text.SimpleDateFormat;
public class SpoolUtils {
private static final Logger LOG = LoggerFactory.getLogger(SpoolUtils.class);
+ private static final String USER_SPECIFIC_PATH_NAME_FORMAT = "%s-%s";
public static final String DEFAULT_CHAR_SET = "UTF-8";
private static final String DEFAULT_LINE_SEPARATOR = System.getProperty("line.separator");
private static final String FILE_EXT_JSON = ".json";
@@ -72,6 +73,33 @@ public class SpoolUtils {
return ret;
}
+ public static File getCreateDirectoryWithPermissionCheck(File file, String user) {
+ File ret = getCreateDirectory(file);
+
+ LOG.info("SpoolUtils.getCreateDirectory({}): Checking permissions...");
+ if (!file.canWrite() || !file.canRead()) {
+ File fileWithUserSuffix = getFileWithUserSuffix(file, user);
+ LOG.error("SpoolUtils.getCreateDirectory({}, {}): Insufficient permissions for user: {}! Will create: {}",
+ file.getAbsolutePath(), user, user, fileWithUserSuffix);
+ ret = getCreateDirectory(fileWithUserSuffix);
+ }
+
+ return ret;
+ }
+
+ private static File getFileWithUserSuffix(File file, String user) {
+ if (!file.isDirectory()) {
+ return file;
+ }
+
+ String absolutePath = file.getAbsolutePath();
+ if (absolutePath.endsWith(File.pathSeparator)) {
+ absolutePath = StringUtils.removeEnd(absolutePath, File.pathSeparator);
+ }
+
+ return new File(String.format(USER_SPECIFIC_PATH_NAME_FORMAT, absolutePath, user));
+ }
+
public static File getCreateDirectory(File file) {
File ret = file;
@@ -79,7 +107,7 @@ public class SpoolUtils {
boolean result = file.mkdirs();
if (!file.isDirectory() || !result) {
- LOG.error("SpoolUtils.getCreateDirectory({}): inaccessible!", file.toString());
+ LOG.error("SpoolUtils.getCreateDirectory({}): cannot be created!", file);
ret = null;
}
diff --git a/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java b/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java
index 304c821..83971f6 100644
--- a/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java
@@ -50,7 +50,7 @@ public class BaseTest {
public SpoolConfiguration getSpoolConfiguration(String spoolDir, String handlerName) {
SpoolConfiguration cfg = new SpoolConfiguration(getConfiguration(spoolDir), handlerName);
- cfg.setSource(SOURCE_TEST);
+ cfg.setSource(SOURCE_TEST, "testuser");
return cfg;
}