You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2020/10/22 21:08:19 UTC
[atlas] branch master updated: ATLAS-3427: Atlas Hook Enhancements
for improved resiliancy.
This is an automated email from the ASF dual-hosted git repository.
amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new de87bc5 ATLAS-3427: Atlas Hook Enhancements for improved resiliancy.
de87bc5 is described below
commit de87bc5022627d82cb8e6048b6728e7028a4af25
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Thu Oct 22 13:45:47 2020 -0700
ATLAS-3427: Atlas Hook Enhancements for improved resiliancy.
---
.../java/org/apache/atlas/hive/hook/HiveHook.java | 5 +
.../atlas/hive/hook/HiveMetastoreHookImpl.java | 2 +-
notification/pom.xml | 12 +
.../main/java/org/apache/atlas/hook/AtlasHook.java | 10 +
.../apache/atlas/hook/FailedMessagesLogger.java | 39 +-
.../apache/atlas/kafka/NotificationProvider.java | 48 +-
.../atlas/notification/AbstractNotification.java | 6 +-
.../apache/atlas/notification/LogConfigUtils.java | 108 +++++
.../atlas/notification/NotificationException.java | 4 +
.../atlas/notification/NotificationInterface.java | 8 +
.../apache/atlas/notification/spool/Archiver.java | 125 ++++++
.../atlas/notification/spool/AtlasFileSpool.java | 163 +++++++
.../atlas/notification/spool/FileOperations.java | 67 +++
.../atlas/notification/spool/IndexManagement.java | 487 +++++++++++++++++++++
.../apache/atlas/notification/spool/Publisher.java | 210 +++++++++
.../notification/spool/SpoolConfiguration.java | 123 ++++++
.../atlas/notification/spool/SpoolUtils.java | 173 ++++++++
.../apache/atlas/notification/spool/Spooler.java | 127 ++++++
.../notification/spool/models/IndexRecord.java | 221 ++++++++++
.../notification/spool/models/IndexRecords.java | 89 ++++
.../spool/utils/local/FileLockedReadWrite.java | 73 +++
.../utils/local/FileOpAppend.java} | 33 +-
.../spool/utils/local/FileOpCompaction.java | 56 +++
.../spool/utils/local/FileOpDelete.java} | 42 +-
.../notification/spool/utils/local/FileOpRead.java | 66 +++
.../spool/utils/local/FileOpUpdate.java | 60 +++
.../spool/utils/local/FileOperation.java | 181 ++++++++
.../notification/AbstractNotificationTest.java | 2 +-
.../notification/spool/AtlasFileSpoolTest.java | 228 ++++++++++
.../apache/atlas/notification/spool/BaseTest.java | 78 ++++
.../notification/spool/IndexManagementTest.java | 189 ++++++++
.../src/test/resources/spool/archive/spool-1.json | 3 +
.../src/test/resources/spool/index-test-src-1.json | 2 +
.../resources/spool/index-test-src-1_closed.json | 1 +
pom.xml | 1 +
35 files changed, 2962 insertions(+), 80 deletions(-)
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 6513234..e48967d 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -164,6 +164,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
public HiveHook() {
}
+ public HiveHook(String name) {
+ super(name);
+ }
+
+
@Override
public void run(HookContext hookContext) throws Exception {
if (LOG.isDebugEnabled()) {
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java
index 3c0f0c1..f01419c 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java
@@ -45,7 +45,7 @@ public class HiveMetastoreHookImpl extends MetaStoreEventListener {
public HiveMetastoreHookImpl(Configuration config) {
super(config);
- this.hiveHook = new HiveHook();
+ this.hiveHook = new HiveHook(this.getClass().getSimpleName());
this.hook = new HiveMetastoreHook();
}
diff --git a/notification/pom.xml b/notification/pom.xml
index 8affd59..740e8e5 100644
--- a/notification/pom.xml
+++ b/notification/pom.xml
@@ -56,6 +56,18 @@
</dependency>
<dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${kafka.scala.binary.version}</artifactId>
</dependency>
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index 8659126..26c2d8f 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -126,6 +126,7 @@ public abstract class AtlasHook {
try {
LOG.info("==> Shutdown of Atlas Hook");
+ notificationInterface.close();
executor.shutdown();
executor.awaitTermination(SHUTDOWN_HOOK_WAIT_TIME_MS, TimeUnit.MILLISECONDS);
executor = null;
@@ -141,6 +142,15 @@ public abstract class AtlasHook {
LOG.info("Created Atlas Hook");
}
+ public AtlasHook() {
+ notificationInterface.init(this.getClass().getSimpleName(), failedMessagesLogger);
+ }
+
+ public AtlasHook(String name) {
+ LOG.info("AtlasHook: Spool name: Passed from caller.: {}", name);
+ notificationInterface.init(name, failedMessagesLogger);
+ }
+
/**
* Notify atlas of the entity through message. The entity can be a
* complex entity with reference to other entities.
diff --git a/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java b/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java
index b319e81..5488c1c 100644
--- a/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java
+++ b/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java
@@ -19,16 +19,14 @@
package org.apache.atlas.hook;
-import org.apache.log4j.Appender;
+import org.apache.atlas.notification.LogConfigUtils;
import org.apache.log4j.DailyRollingFileAppender;
-import org.apache.log4j.FileAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import java.io.File;
import java.io.IOException;
-import java.util.Enumeration;
/**
* A logger wrapper that can be used to write messages that failed to be sent to a log file.
@@ -46,7 +44,7 @@ public class FailedMessagesLogger {
}
void init() {
- String rootLoggerDirectory = getRootLoggerDirectory();
+ String rootLoggerDirectory = LogConfigUtils.getRootDir();
if (rootLoggerDirectory == null) {
return;
}
@@ -62,38 +60,7 @@ public class FailedMessagesLogger {
}
}
- /**
- * Get the root logger file location under which the failed log messages will be written.
- *
- * Since this class is used in Hooks which run within JVMs of other components like Hive,
- * we want to write the failed messages file under the same location as where logs from
- * the host component are saved. This method attempts to get such a location from the
- * root logger's appenders. It will work only if at least one of the appenders is a {@link FileAppender}
- *
- * @return directory under which host component's logs are stored.
- */
- private String getRootLoggerDirectory() {
- String rootLoggerDirectory = null;
- Logger rootLogger = Logger.getRootLogger();
- Enumeration allAppenders = rootLogger.getAllAppenders();
-
- if (allAppenders != null) {
- while (allAppenders.hasMoreElements()) {
- Appender appender = (Appender) allAppenders.nextElement();
-
- if (appender instanceof FileAppender) {
- FileAppender fileAppender = (FileAppender) appender;
- String rootLoggerFile = fileAppender.getFile();
-
- rootLoggerDirectory = rootLoggerFile != null ? new File(rootLoggerFile).getParent() : null;
- break;
- }
- }
- }
- return rootLoggerDirectory;
- }
-
- void log(String message) {
+ public void log(String message) {
logger.error(message);
}
}
diff --git a/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java b/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java
index 2dd970e..b35af97 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java
@@ -19,23 +19,59 @@ package org.apache.atlas.kafka;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
+import org.apache.atlas.notification.LogConfigUtils;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.notification.spool.AtlasFileSpool;
import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
/**
* Provider class for Notification interfaces
*/
public class NotificationProvider {
- private static KafkaNotification kafka;
+ private static final Logger LOG = LoggerFactory.getLogger(NotificationProvider.class);
+
+ private static final String CONF_ATLAS_HOOK_SPOOL_ENABLED = "atlas.hook.spool.enabled";
+ private static final String CONF_ATLAS_HOOK_SPOOL_DIR = "atlas.hook.spool.dir";
+
+ private static final boolean CONF_ATLAS_HOOK_SPOOL_ENABLED_DEFAULT = false;
- public static KafkaNotification get() {
- if (kafka == null) {
+ private static NotificationInterface notificationProvider;
+
+ public static NotificationInterface get() {
+ if (notificationProvider == null) {
try {
- Configuration applicationProperties = ApplicationProperties.get();
- kafka = new KafkaNotification(applicationProperties);
+ Configuration conf = ApplicationProperties.get();
+ KafkaNotification kafka = new KafkaNotification(conf);
+ String spoolDir = getSpoolDir(conf);
+
+ if (isSpoolingEnabled(conf) && StringUtils.isNotEmpty(spoolDir)) {
+ LOG.info("Notification spooling is enabled: spool directory={}", spoolDir);
+
+ conf.setProperty(CONF_ATLAS_HOOK_SPOOL_DIR, spoolDir);
+
+ notificationProvider = new AtlasFileSpool(conf, kafka);
+ } else {
+ LOG.info("Notification spooling is not enabled");
+
+ notificationProvider = kafka;
+ }
} catch (AtlasException e) {
throw new RuntimeException(e);
}
}
- return kafka;
+ return notificationProvider;
+ }
+
+ private static boolean isSpoolingEnabled(Configuration configuration) {
+ return configuration.getBoolean(CONF_ATLAS_HOOK_SPOOL_ENABLED, CONF_ATLAS_HOOK_SPOOL_ENABLED_DEFAULT);
+ }
+
+ private static String getSpoolDir(Configuration configuration) {
+ return configuration.getString(CONF_ATLAS_HOOK_SPOOL_DIR);
}
}
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
index 45a66bf..c45a1da 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
@@ -76,6 +76,10 @@ public abstract class AbstractNotification implements NotificationInterface {
protected AbstractNotification() {
}
+ @Override
+ public void init(String source, Object failedMessagesLogger) {
+ }
+
// ----- NotificationInterface -------------------------------------------
@Override
@@ -108,7 +112,7 @@ public abstract class AbstractNotification implements NotificationInterface {
*
* @throws NotificationException if an error occurs while sending
*/
- protected abstract void sendInternal(NotificationType type, List<String> messages) throws NotificationException;
+ public abstract void sendInternal(NotificationType type, List<String> messages) throws NotificationException;
// ----- utility methods -------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/LogConfigUtils.java b/notification/src/main/java/org/apache/atlas/notification/LogConfigUtils.java
new file mode 100644
index 0000000..dc98592
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/LogConfigUtils.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.FileAppender;
+import org.apache.logging.log4j.core.appender.RollingFileAppender;
+import org.apache.logging.log4j.core.appender.RollingRandomAccessFileAppender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Enumeration;
+
+public class LogConfigUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(LogConfigUtils.class);
+
+ public static String getRootDir() {
+ String ret = getFileAppenderPath();
+
+ if (StringUtils.isEmpty(ret)) {
+ ret = getFileAppenderPathApproach2();
+ }
+
+ if (StringUtils.isNotEmpty(ret)) {
+ ret = StringUtils.substringBeforeLast(ret, File.separator);
+ } else {
+ ret = null;
+ }
+
+ LOG.info("getRootDir(): ret={}", ret);
+
+ return ret;
+ }
+
+ private static String getFileAppenderPath() {
+ String ret = StringUtils.EMPTY;
+ LoggerContext loggerContext = (LoggerContext) LogManager.getContext();
+ Configuration configuration = loggerContext.getConfiguration();
+
+ for (Appender appender : configuration.getAppenders().values()) {
+ if (appender instanceof RollingRandomAccessFileAppender) {
+ ret = ((RollingRandomAccessFileAppender) appender).getFileName();
+ break;
+ } else if (appender instanceof RollingFileAppender) {
+ ret = ((RollingRandomAccessFileAppender) appender).getFileName();
+ break;
+ } else if (appender instanceof FileAppender) {
+ ret = ((FileAppender) appender).getFileName();
+ break;
+ } else {
+ LOG.info("Could not infer log path from this appender: {}", appender.getClass().getName());
+ }
+ }
+
+ LOG.info("getFileAppenderPath(): ret={}", ret);
+
+ return ret;
+ }
+
+ private static String getFileAppenderPathApproach2() {
+ String ret = null;
+
+ try {
+ org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger();
+ Enumeration allAppenders = rootLogger.getAllAppenders();
+
+ if (allAppenders != null) {
+ while (allAppenders.hasMoreElements()) {
+ Object appender = allAppenders.nextElement();
+
+ if (appender instanceof org.apache.log4j.FileAppender) {
+ org.apache.log4j.FileAppender fileAppender = (org.apache.log4j.FileAppender) appender;
+
+ ret = fileAppender.getName();
+
+ break;
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("getFileAppenderPathApproach2(): failed to get appender path", e);
+ }
+
+ LOG.info("getFileAppenderPathApproach2(): ret={}", ret);
+
+ return ret;
+ }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationException.java b/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
index 2dd9c9f..353d650 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
@@ -31,6 +31,10 @@ public class NotificationException extends AtlasException {
super(e);
}
+ public NotificationException(Exception e, String errorMsg) {
+ super(errorMsg, e);
+ }
+
public NotificationException(Exception e, List<String> failedMessages) {
super(e);
this.failedMessages = failedMessages;
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
index 6caf7e2..edd8ed9 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -61,6 +61,14 @@ public interface NotificationInterface {
/**
*
+ * @param source: Name of the source
+ * @param failedMessagesLogger: Logger for failed messages
+ * @return
+ */
+ void init(String source, Object failedMessagesLogger);
+
+ /**
+ *
* @param user Name of the user under which the processes is running
*/
void setCurrentUser(String user);
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/Archiver.java b/notification/src/main/java/org/apache/atlas/notification/spool/Archiver.java
new file mode 100644
index 0000000..9e12d26
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/Archiver.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool;
+
+import org.apache.atlas.notification.spool.models.IndexRecord;
+import org.apache.atlas.type.AtlasType;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+
+public class Archiver {
+ private static final Logger LOG = LoggerFactory.getLogger(Archiver.class);
+
+ private final String source;
+ private final File indexDoneFile;
+ private final File archiveFolder;
+ private final int maxArchiveFiles;
+
+ public Archiver(String source, File indexDoneFile, File archiveFolder, int maxArchiveFiles) {
+ this.source = source;
+ this.indexDoneFile = indexDoneFile;
+ this.archiveFolder = archiveFolder;
+ this.maxArchiveFiles = maxArchiveFiles;
+ }
+
+ public void archive(IndexRecord indexRecord) {
+ moveToArchiveDir(indexRecord);
+
+ removeOldFiles();
+ }
+
+ private void moveToArchiveDir(IndexRecord indexRecord) {
+ File spoolFile = null;
+ File archiveFile = null;
+
+ try {
+ spoolFile = new File(indexRecord.getPath());
+ archiveFile = new File(archiveFolder, spoolFile.getName());
+
+ LOG.info("{}: moving spoolFile={} to archiveFile={}", source, spoolFile, archiveFile);
+
+ FileUtils.moveFile(spoolFile, archiveFile);
+ } catch (FileNotFoundException excp) {
+ LOG.warn("{}: failed while moving spoolFile={} to archiveFile={}", source, spoolFile, archiveFile, excp);
+ } catch (IOException excp) {
+ LOG.error("{}: failed while moving spoolFile={} to archiveFile={}", source, spoolFile, archiveFile, excp);
+ }
+ }
+
+ private void removeOldFiles() {
+ try {
+ File[] logFiles = archiveFolder == null ? null : archiveFolder.listFiles(pathname -> StringUtils.endsWithIgnoreCase(pathname.getName(), SpoolUtils.FILE_EXT_LOG));
+ int filesToDelete = logFiles == null ? 0 : logFiles.length - maxArchiveFiles;
+
+ if (filesToDelete > 0) {
+ try (BufferedReader br = new BufferedReader(new FileReader(indexDoneFile))) {
+ int filesDeletedCount = 0;
+
+ for (String line = br.readLine(); line != null; line = br.readLine()) {
+ line = line.trim();
+
+ if (StringUtils.isEmpty(line)) {
+ continue;
+ }
+
+ try {
+ IndexRecord record = AtlasType.fromJson(line, IndexRecord.class);
+ File logFile = new File(record.getPath());
+ String fileName = logFile.getName();
+ File archiveFile = new File(archiveFolder, fileName);
+
+ if (!archiveFile.exists()) {
+ LOG.warn("archive file does not exist: {}", archiveFile);
+
+ continue;
+ }
+
+ LOG.info("Deleting archive file: {}", archiveFile);
+
+ boolean ret = archiveFile.delete();
+
+ if (!ret) {
+ LOG.error("{}: Error deleting archive file. File: {}", source, archiveFile);
+ } else {
+ filesDeletedCount++;
+ }
+
+ if (filesDeletedCount >= filesToDelete) {
+ break;
+ }
+ } catch (Exception excp) {
+ LOG.error("{}: Error deleting older archive file in index-record: {}", source, line, excp);
+ }
+ }
+
+ LOG.info("{}: Deleted: {} archived files", source, filesDeletedCount);
+ }
+ }
+ } catch(Exception exception){
+ LOG.error("{}: Error deleting older files from archive folder. Folder: {}", source, archiveFolder, exception);
+ }
+ }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
new file mode 100644
index 0000000..2d7d195
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.hook.FailedMessagesLogger;
+import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationException;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class AtlasFileSpool implements NotificationInterface {
+ private static final Logger LOG = LoggerFactory.getLogger(AtlasFileSpool.class);
+
+ private final AbstractNotification notificationHandler;
+ private final SpoolConfiguration config;
+ private final IndexManagement indexManagement;
+ private final Spooler spooler;
+ private final Publisher publisher;
+ private Thread publisherThread;
+ private Boolean initDone = null;
+
+ public AtlasFileSpool(Configuration configuration, AbstractNotification notificationHandler) {
+ this.notificationHandler = notificationHandler;
+ this.config = new SpoolConfiguration(configuration, notificationHandler.getClass().getSimpleName());
+ this.indexManagement = new IndexManagement(config);
+ this.spooler = new Spooler(config, indexManagement);
+ this.publisher = new Publisher(config, indexManagement, notificationHandler);
+ }
+
+ @Override
+ public void init(String source, Object failedMessagesLogger) {
+ LOG.info("==> AtlasFileSpool.init(source={})", source);
+
+ if (!isInitDone()) {
+ try {
+ config.setSource(source);
+
+ LOG.info("{}: Initialization: Starting...", this.config.getSourceName());
+
+ indexManagement.init();
+
+ if (failedMessagesLogger instanceof FailedMessagesLogger) {
+ this.spooler.setFailedMessagesLogger((FailedMessagesLogger) failedMessagesLogger);
+ }
+
+ startPublisher();
+
+ initDone = true;
+ } catch (AtlasException exception) {
+ LOG.error("AtlasFileSpool(source={}): initialization failed", this.config.getSourceName(), exception);
+
+ initDone = false;
+ } catch (Throwable t) {
+ LOG.error("AtlasFileSpool(source={}): initialization failed, unknown error", this.config.getSourceName(), t);
+ }
+ } else {
+ LOG.info("AtlasFileSpool.init(): initialization already done. initDone={}", initDone);
+ }
+
+ LOG.info("<== AtlasFileSpool.init(source={})", source);
+ }
+
+ @Override
+ public void setCurrentUser(String user) {
+ this.notificationHandler.setCurrentUser(user);
+ }
+
+ @Override
+ public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) {
+ LOG.warn("AtlasFileSpool.createConsumers(): not implemented");
+
+ return null;
+ }
+
+ @Override
+ public <T> void send(NotificationType type, T... messages) throws NotificationException {
+ send(type, Arrays.asList(messages));
+ }
+
+ @Override
+ public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
+ if (hasInitSucceeded() && (this.indexManagement.isPending() || this.publisher.isDestinationDown())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("AtlasFileSpool.send(): sending to spooler");
+ }
+
+ spooler.send(type, messages);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("AtlasFileSpool.send(): sending to notificationHandler");
+ }
+
+ try {
+ notificationHandler.send(type, messages);
+ } catch (Exception e) {
+ if (isInitDone()) {
+ LOG.info("AtlasFileSpool.send(): failed in sending to notificationHandler. Sending to spool", e);
+
+ publisher.setDestinationDown();
+
+ spooler.send(type, messages);
+ } else {
+ LOG.warn("AtlasFileSpool.send(): failed in sending to notificationHandler. Not sending to spool, as it is not yet initialized", e);
+
+ throw e;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ spooler.setDrain();
+ publisher.setDrain();
+ indexManagement.stop();
+
+ publisherThread.join();
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted! source={}", this.config.getSourceName(), e);
+ }
+ }
+
+ private void startPublisher() {
+ publisherThread = new Thread(publisher);
+
+ publisherThread.setDaemon(true);
+ publisherThread.setContextClassLoader(this.getClass().getClassLoader());
+ publisherThread.start();
+
+ LOG.info("{}: publisher started!", this.config.getSourceName());
+ }
+
+ private boolean isInitDone() {
+ return this.initDone != null;
+ }
+
+ private boolean hasInitSucceeded() {
+ return this.initDone != null && this.initDone == true;
+ }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/FileOperations.java b/notification/src/main/java/org/apache/atlas/notification/spool/FileOperations.java
new file mode 100644
index 0000000..538ea49
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/FileOperations.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool;
+
+import org.apache.atlas.notification.spool.utils.local.FileOpAppend;
+import org.apache.atlas.notification.spool.utils.local.FileOpCompaction;
+import org.apache.atlas.notification.spool.utils.local.FileOpDelete;
+import org.apache.atlas.notification.spool.utils.local.FileOpRead;
+import org.apache.atlas.notification.spool.utils.local.FileOpUpdate;
+
+import java.io.File;
+
+public class FileOperations {
+ private final String emptyRecordJson;
+ private final FileOpAppend fileOpAppend;
+ private final FileOpRead fileOpLoad;
+ private final FileOpUpdate fileOpUpdate;
+ private final FileOpCompaction fileOpCompaction;
+ private final FileOpDelete fileOpDelete;
+
+ public FileOperations(String emptyRecordJson, String source) {
+ this.emptyRecordJson = emptyRecordJson;
+ this.fileOpAppend = new FileOpAppend(source);
+ this.fileOpLoad = new FileOpRead(source);
+ this.fileOpUpdate = new FileOpUpdate(source, fileOpAppend);
+ this.fileOpCompaction = new FileOpCompaction(source);
+ this.fileOpDelete = new FileOpDelete(source);
+ }
+
+ public String[] load(File file) {
+ fileOpLoad.perform(file);
+
+ return fileOpLoad.getItems();
+ }
+
+ public void delete(File file, String id) {
+ fileOpDelete.perform(file, id, emptyRecordJson);
+ }
+
+ public void append(File file, String json) {
+ fileOpAppend.perform(file, json);
+ }
+
+ public void compact(File file) {
+ fileOpCompaction.perform(file);
+ }
+
+ public void update(File file, String id, String json) {
+ fileOpUpdate.setId(id);
+ fileOpUpdate.perform(file, json);
+ }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
new file mode 100644
index 0000000..b3a586b
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
@@ -0,0 +1,487 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.notification.spool.models.IndexRecord;
+import org.apache.atlas.notification.spool.models.IndexRecords;
+import org.apache.atlas.notification.spool.utils.local.FileLockedReadWrite;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutput;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.channels.OverlappingFileLockException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class IndexManagement {
+ private static final Logger LOG = LoggerFactory.getLogger(IndexManagement.class);
+
+ private static final int MAX_RETRY_ATTEMPTS = 3;
+
+ private final SpoolConfiguration config;
+ private IndexFileManager indexFileManager;
+ private IndexReader indexReader;
+ private IndexWriter indexWriter;
+
+ public IndexManagement(SpoolConfiguration config) {
+ this.config = config;
+ }
+
+ public void init() throws IOException, AtlasException {
+ String sourceName = config.getSourceName();
+
+ File spoolDir = SpoolUtils.getCreateDirectory(config.getSpoolDir());
+
+ if (spoolDir == null) {
+ throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, spoolDir.getAbsolutePath()));
+ }
+
+ File archiveDir = SpoolUtils.getCreateDirectory(config.getArchiveDir());
+
+ if (archiveDir == null) {
+ throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, archiveDir.getAbsolutePath()));
+ }
+
+ File indexFile = SpoolUtils.getCreateFile(config.getIndexFile(), sourceName);
+
+ if (indexFile == null) {
+ throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, indexFile.getAbsolutePath()));
+ }
+
+ File indexDoneFile = SpoolUtils.getCreateFile(config.getIndexDoneFile(), sourceName);
+
+ if (indexDoneFile == null) {
+ throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, indexDoneFile.getAbsolutePath()));
+ }
+
+ performInit(indexFile.getAbsolutePath(), sourceName);
+ }
+
+ @VisibleForTesting
+ void performInit(String indexFilePath, String source) {
+ try {
+ File spoolDir = config.getSpoolDir();
+ File archiveDir = config.getArchiveDir();
+ File indexFile = config.getIndexFile();
+ File indexDoneFile = config.getIndexDoneFile();
+
+ indexFileManager = new IndexFileManager(source, indexFile, indexDoneFile, archiveDir, config.getMaxArchiveFiles());
+ indexReader = new IndexReader(source, indexFileManager, config.getRetryDestinationMS());
+ indexWriter = new IndexWriter(source, config, indexFileManager, indexReader, spoolDir, archiveDir, config.getFileRolloverSec());
+ } catch (Exception e) {
+ LOG.error("{}: init: Failed! Error loading records from index file: {}", config.getSourceName(), indexFilePath);
+ }
+ }
+
+ public boolean isPending() {
+ return !indexReader.isEmpty() ||
+ (indexWriter.getCurrent() != null && indexWriter.getCurrent().getLine() > 0);
+ }
+
+ public synchronized DataOutput getSpoolWriter() throws IOException {
+ return indexWriter.getCreateWriter();
+ }
+
+ public void setSpoolWriteInProgress() {
+ this.indexWriter.setFileWriteInProgress(true);
+ }
+
+ public void resetSpoolWriteInProgress() {
+ this.indexWriter.setFileWriteInProgress(false);
+ }
+
+ public void updateFailedAttempt() {
+ this.indexReader.updateFailedAttempt();
+ }
+
+ public IndexRecord next() throws InterruptedException {
+ return indexReader.next();
+ }
+
+ public int getQueueSize() {
+ return indexReader.size();
+ }
+
+ public void removeAsDone(IndexRecord indexRecord) {
+ this.indexReader.removeAsDone(indexRecord);
+ this.indexWriter.rolloverIfNeeded();
+ }
+
+ public void stop() {
+ indexWriter.stop();
+ }
+
+ public void rolloverSpoolFileIfNeeded() {
+ this.indexWriter.rolloverIfNeeded();
+ }
+
+ @VisibleForTesting
+ IndexFileManager getIndexFileManager() {
+ return this.indexFileManager;
+ }
+
+ public void update(IndexRecord record) {
+ this.indexFileManager.updateIndex(record);
+ }
+
+ public void flushSpoolWriter() throws IOException {
+ this.indexWriter.flushCurrent();
+ }
+
+ static class IndexWriter {
+ private final String source;
+ private final SpoolConfiguration config;
+ private final File spoolFolder;
+ private final File archiveFolder;
+ private final int rollOverTimeout;
+ private final IndexFileManager indexFileManager;
+ private final IndexReader indexReader;
+ private final FileLockedReadWrite fileLockedReadWrite;
+ private IndexRecord currentIndexRecord;
+ private DataOutput currentWriter;
+ private boolean fileWriteInProgress;
+
+
+ public IndexWriter(String source, SpoolConfiguration config, IndexFileManager indexFileManager,
+ IndexReader indexReader,
+ File spoolFolder, File archiveFolder, int rollOverTimeout) {
+ this.source = source;
+ this.config = config;
+ this.indexFileManager = indexFileManager;
+ this.indexReader = indexReader;
+ this.spoolFolder = spoolFolder;
+ this.archiveFolder = archiveFolder;
+ this.rollOverTimeout = rollOverTimeout;
+ this.fileLockedReadWrite = new FileLockedReadWrite(source);
+
+ setCurrent(indexFileManager.getFirstWriteInProgressRecord());
+ }
+
+ public void setCurrent(IndexRecord indexRecord) {
+ this.currentIndexRecord = indexRecord;
+ }
+
+ public IndexRecord getCurrent() {
+ return this.currentIndexRecord;
+ }
+
+ private void setCurrentWriter(File file) throws IOException {
+ this.currentWriter = fileLockedReadWrite.getOutput(file);
+ }
+
+ public synchronized DataOutput getWriter() {
+ return this.currentWriter;
+ }
+
+ public synchronized DataOutput getCreateWriter() throws IOException {
+ rolloverIfNeeded();
+
+ if (getCurrent() == null) {
+ IndexRecord record = new IndexRecord(StringUtils.EMPTY);
+ String filePath = SpoolUtils.getSpoolFilePath(config, spoolFolder.toString(), archiveFolder.toString(), record.getId());
+
+ record.setPath(filePath);
+
+ indexFileManager.appendToIndexFile(record);
+
+ setCurrent(record);
+
+ LOG.info("IndexWriter.getCreateWriter(source={}): Creating new spool file. File: {}", this.source, filePath);
+
+ setCurrentWriter(new File(filePath));
+ } else {
+ if (this.currentWriter == null) {
+ LOG.info("IndexWriter.getCreateWriter(source={}): Opening existing file for append: File: {}", this.source, currentIndexRecord.getPath());
+
+ setCurrentWriter(new File(currentIndexRecord.getPath()));
+ }
+ }
+
+ return currentWriter;
+ }
+
+ public synchronized void rolloverIfNeeded() {
+ if (currentWriter != null && shouldRolloverSpoolFile()) {
+ LOG.info("IndexWriter.rolloverIfNeeded(source={}): Rolling over. Closing File: {}", this.config.getSourceName(), currentIndexRecord.getPath());
+
+ fileLockedReadWrite.close();
+
+ currentWriter = null;
+
+ currentIndexRecord.setStatusPending();
+
+ indexFileManager.updateIndex(currentIndexRecord);
+
+ LOG.info("IndexWriter.rolloverIfNeeded(source={}): Adding file to queue. File: {}", this.config.getSourceName(), currentIndexRecord.getPath());
+
+ indexReader.addToPublishQueue(currentIndexRecord);
+
+ currentIndexRecord = null;
+ }
+ }
+
+ private boolean shouldRolloverSpoolFile() {
+ return currentIndexRecord != null &&
+ (System.currentTimeMillis() - currentIndexRecord.getCreated() > this.rollOverTimeout);
+ }
+
+ void flushCurrent() throws IOException {
+ DataOutput pw = getWriter();
+
+ if (pw != null) {
+ fileLockedReadWrite.flush();
+ }
+ }
+
+ public void setFileWriteInProgress(boolean val) {
+ this.fileWriteInProgress = val;
+ }
+
+ public boolean isWriteInProgress() {
+ return this.fileWriteInProgress;
+ }
+
+ public void stop() {
+ LOG.info("==> IndexWriter.stop(source={})", this.config.getSourceName());
+
+ try {
+ DataOutput out = getWriter();
+
+ if (out != null) {
+ flushCurrent();
+
+ for (int i = 0; i < MAX_RETRY_ATTEMPTS; i++) {
+ if (isWriteInProgress()) {
+ try {
+ TimeUnit.SECONDS.sleep(i);
+ } catch (InterruptedException e) {
+ LOG.error("IndexWriter.stop(source={}): Interrupted!", this.config.getSourceName(), e);
+
+ break;
+ }
+
+ continue;
+ }
+
+ LOG.info("IndexWriter.stop(source={}): Closing open file.", this.config.getSourceName());
+
+ fileLockedReadWrite.close();
+ currentIndexRecord.setStatusPending();
+ indexFileManager.updateIndex(currentIndexRecord);
+
+ break;
+ }
+ }
+ } catch (FileNotFoundException e) {
+ LOG.error("IndexWriter.stop(source={}): File not found! {}", this.config.getSourceName(), getCurrent().getPath(), e);
+ } catch (IOException exception) {
+ LOG.error("IndexWriter.stop(source={}): Error accessing file: {}", this.config.getSourceName(), getCurrent().getPath(), exception);
+ } catch (Exception exception) {
+ LOG.error("IndexWriter.stop(source={}): Error closing spool file.", this.config.getSourceName(), exception);
+ }
+
+ LOG.info("<== IndexWriter.stop(source={})", this.config.getSourceName());
+ }
+ }
+
+ static class IndexReader {
+ private final String source;
+ private final BlockingQueue<IndexRecord> blockingQueue;
+ private final IndexFileManager indexFileManager;
+ private final long retryDestinationMS;
+ private IndexRecord currentIndexRecord;
+
+ public IndexReader(String source, IndexFileManager indexFileManager, long retryDestinationMS) {
+ this.source = source;
+ this.blockingQueue = new LinkedBlockingQueue<>();
+ this.retryDestinationMS = retryDestinationMS;
+ this.indexFileManager = indexFileManager;
+
+ List<IndexRecord> records = indexFileManager.getRecords();
+
+ records.stream().forEach(x -> addIfStatus(x, IndexRecord.STATUS_READ_IN_PROGRESS));
+ records.stream().forEach(x -> addIfStatus(x, IndexRecord.STATUS_PENDING));
+ }
+
+ private void addIfStatus(IndexRecord record, String status) {
+ if (record != null && record.getStatus().equals(status)) {
+ if (!SpoolUtils.fileExists(record)) {
+ LOG.error("IndexReader.addIfStatus(source={}): file {} not found!", this.source, record.getPath());
+ } else {
+ addToPublishQueue(record);
+ }
+ }
+ }
+
+ public void addToPublishQueue(IndexRecord record) {
+ try {
+ if (!blockingQueue.contains(record)) {
+ blockingQueue.add(record);
+ }
+ } catch (OverlappingFileLockException lockException) {
+ LOG.warn("{}: {}: Someone else has locked the file.", source, record.getPath());
+ }
+ }
+
+ public IndexRecord next() throws InterruptedException {
+ this.currentIndexRecord = blockingQueue.poll(retryDestinationMS, TimeUnit.MILLISECONDS);
+
+ return this.currentIndexRecord;
+ }
+
+ public int size() {
+ return blockingQueue.size();
+ }
+
+ public boolean isEmpty() {
+ return blockingQueue.isEmpty();
+ }
+
+ public void updateFailedAttempt() {
+ if (currentIndexRecord != null) {
+ currentIndexRecord.updateFailedAttempt();
+
+ indexFileManager.updateIndex(currentIndexRecord);
+ }
+ }
+
+ public void removeAsDone(IndexRecord indexRecord) {
+ indexRecord.setDone();
+
+ indexFileManager.remove(indexRecord);
+ }
+ }
+
+ static class IndexFileManager {
+ private final String source;
+ private final File indexDoneFile;
+ private final File indexFile;
+ private final Archiver archiver;
+ private final FileOperations fileOperations;
+
+ public IndexFileManager(String source, File indexFile, File indexDoneFile, File archiveFolder, int maxArchiveFiles) {
+ this.source = source;
+ this.indexFile = indexFile;
+ this.indexDoneFile = indexDoneFile;
+ this.archiver = new Archiver(source, indexDoneFile, archiveFolder, maxArchiveFiles);
+ this.fileOperations = new FileOperations(SpoolUtils.getEmptyRecordForWriting(), source);
+ }
+
+ public List<IndexRecord> getRecords() {
+ return new ArrayList<>(loadRecords(indexFile).getRecords().values());
+ }
+
+ public synchronized void delete(File file, String id) {
+ fileOperations.delete(file, id);
+ }
+
+ public synchronized IndexRecord getFirstWriteInProgressRecord() {
+ IndexRecord ret = null;
+ IndexRecords records = loadRecords(indexFile);
+
+ if (records != null) {
+ for (IndexRecord record : records.getRecords().values()) {
+ if (record.isStatusWriteInProgress()) {
+ LOG.info("IndexFileManager.getFirstWriteInProgressRecord(source={}): current file={}", this.source, record.getPath());
+
+ ret = record;
+
+ break;
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ public synchronized void remove(IndexRecord record) {
+ delete(indexFile, record.getId());
+
+ appendToDoneFile(record);
+
+ IndexRecords records = loadRecords(indexFile);
+
+ if (records.size() == 0) {
+ LOG.info("IndexFileManager.remove(source={}): All done!", this.source);
+
+ compactFile(indexFile);
+ }
+ }
+
+ public void appendToIndexFile(IndexRecord record) {
+ fileOperations.append(indexFile, SpoolUtils.getRecordForWriting(record));
+ }
+
+ public void updateIndex(IndexRecord record) {
+ fileOperations.update(indexFile, record.getId(), SpoolUtils.getRecordForWriting(record));
+ }
+
+ private void compactFile(File file) {
+ LOG.info("IndexFileManager.compactFile(source={}): compacting file {}", source, file.getAbsolutePath());
+
+ try {
+ fileOperations.compact(file);
+ } finally {
+ LOG.info("IndexFileManager.compactFile(source={}): done compacting file {}", source, file.getAbsolutePath());
+ }
+ }
+
+ private void appendToDoneFile(IndexRecord indexRecord) {
+ String json = SpoolUtils.getRecordForWriting(indexRecord);
+
+ fileOperations.append(indexDoneFile, json);
+
+ archiver.archive(indexRecord);
+ }
+
+ @VisibleForTesting
+ IndexRecords loadRecords(File file) {
+ String[] items = fileOperations.load(file);
+
+ return SpoolUtils.createRecords(items);
+ }
+
+ @VisibleForTesting
+ File getDoneFile() {
+ return this.indexDoneFile;
+ }
+
+ @VisibleForTesting
+ File getIndexFile() {
+ return this.indexFile;
+ }
+
+ @VisibleForTesting
+ IndexRecord add(String path) {
+ IndexRecord record = new IndexRecord(path);
+
+ appendToIndexFile(record);
+
+ return record;
+ }
+ }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java b/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java
new file mode 100644
index 0000000..2947a21
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.notification.NotificationException;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.notification.spool.models.IndexRecord;
+import org.apache.atlas.notification.spool.utils.local.FileLockedReadWrite;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.channels.OverlappingFileLockException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class Publisher implements Runnable {
+ private static final Logger LOG = LoggerFactory.getLogger(Publisher.class);
+
+ private final SpoolConfiguration configuration;
+ private final IndexManagement indexManagement;
+ private final AbstractNotification notificationHandler;
+ private final String notificationHandlerName;
+ private final int retryDestinationMS;
+ private final int messageBatchSize;
+ private String source;
+ private boolean isDrain;
+ private boolean isDestDown;
+
+ public Publisher(SpoolConfiguration configuration, IndexManagement indexManagement, AbstractNotification notificationHandler) {
+ this.configuration = configuration;
+ this.indexManagement = indexManagement;
+ this.notificationHandler = notificationHandler;
+ this.notificationHandlerName = notificationHandler.getClass().getSimpleName();
+ this.retryDestinationMS = configuration.getRetryDestinationMS();
+ this.messageBatchSize = configuration.getMessageBatchSize();
+ }
+
+ public void run() {
+ this.source = configuration.getSourceName();
+
+ LOG.info("Publisher.run(source={}): starting publisher {}", this.source, notificationHandlerName);
+
+ try {
+ IndexRecord record = null;
+
+ while (true) {
+ waitIfDestinationDown();
+
+ if (this.isDrain) {
+ break;
+ }
+
+ record = fetchNext(record);
+
+ if (record != null && processAndDispatch(record)) {
+ indexManagement.removeAsDone(record);
+
+ record = null;
+ } else {
+ indexManagement.rolloverSpoolFileIfNeeded();
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Publisher.run(source={}): {}: Publisher: Shutdown might be in progress!", this.source, notificationHandlerName);
+ } catch (Exception e) {
+ LOG.error("Publisher.run(source={}): {}: Publisher: Exception in destination writing!", this.source, notificationHandlerName, e);
+ }
+
+ LOG.info("Publisher.run(source={}): publisher {} exited!", this.source, notificationHandlerName);
+ }
+
+ public void setDestinationDown() {
+ this.isDestDown = true;
+
+ this.indexManagement.updateFailedAttempt();
+ }
+
+ public void setDrain() {
+ this.isDrain = true;
+ }
+
+ public boolean isDestinationDown() {
+ return isDestDown;
+ }
+
+ private void waitIfDestinationDown() throws InterruptedException {
+ if (isDestDown) {
+ LOG.info("Publisher.waitIfDestinationDown(source={}): {}: Destination is down. Sleeping for: {} ms. Queue: {} items",
+ this.source, notificationHandlerName, retryDestinationMS, indexManagement.getQueueSize());
+
+ Thread.sleep(retryDestinationMS);
+ }
+
+ }
+
+ private IndexRecord fetchNext(IndexRecord record) {
+ if (record == null) {
+ try {
+ record = indexManagement.next();
+ } catch (Exception e) {
+ LOG.error("Publisher.fetchNext(source={}): failed!. publisher={}", this.source, notificationHandlerName, e);
+ }
+ }
+
+ return record;
+ }
+
+ @VisibleForTesting
+ boolean processAndDispatch(IndexRecord record) throws IOException {
+ boolean ret = true;
+
+ if (SpoolUtils.fileExists(record)) {
+ FileLockedReadWrite fileLockedRead = new FileLockedReadWrite(source);
+
+ try {
+ DataInput dataInput = fileLockedRead.getInput(new File(record.getPath()));
+ int lineInSpoolFile = 0;
+ List<String> messages = new ArrayList<>();
+
+ for (String message = dataInput.readLine(); message != null; message = dataInput.readLine()) {
+ lineInSpoolFile++;
+
+ if (lineInSpoolFile < record.getLine()) {
+ continue;
+ }
+
+ messages.add(message);
+
+ if (messages.size() == messageBatchSize) {
+ dispatch(record, lineInSpoolFile, messages);
+ }
+ }
+
+ dispatch(record, lineInSpoolFile, messages);
+
+ LOG.info("Publisher.processAndDispatch(source={}): consumer={}: done reading file {}", this.source, notificationHandlerName, record.getPath());
+
+ ret = true;
+ } catch (OverlappingFileLockException ex) {
+ LOG.error("Publisher.processAndDispatch(source={}): consumer={}: some other process has locked this file {}", this.source, notificationHandlerName, record.getPath(), ex);
+ ret = false;
+ } catch (FileNotFoundException ex) {
+ LOG.error("Publisher.processAndDispatch(source={}): consumer={}: file not found {}", this.source, notificationHandlerName, record.getPath(), ex);
+ ret = true;
+ } catch (Exception ex) {
+ LOG.error("Publisher.processAndDispatch(source={}): consumer={}: failed for file {}", this.source, notificationHandlerName, record.getPath(), ex);
+ ret = false;
+ } finally {
+ fileLockedRead.close();
+ }
+ } else {
+ LOG.error("Publisher.processAndDispatch(source={}): publisher={}: file '{}' not found!", this.source, notificationHandlerName, record.getPath());
+
+ ret = true;
+ }
+
+ return ret;
+ }
+
+ private void dispatch(IndexRecord record, int lineInSpoolFile, List<String> messages) throws Exception {
+ if (notificationHandler == null || messages == null || messages.size() == 0) {
+ LOG.error("Publisher.dispatch(source={}): consumer={}: error sending logs", this.source, notificationHandlerName);
+ } else {
+ dispatch(record.getPath(), messages);
+
+ record.setCurrentLine(lineInSpoolFile);
+ indexManagement.update(record);
+ isDestDown = false;
+ }
+ }
+
+ private void dispatch(String filePath, List<String> messages) throws Exception {
+ try {
+ notificationHandler.sendInternal(NotificationInterface.NotificationType.HOOK, messages);
+
+ if (isDestDown) {
+ LOG.info("Publisher.dispatch(source={}): consumer={}: destination is now up. file={}", this.source, notificationHandlerName, filePath);
+ }
+ } catch (Exception exception) {
+ setDestinationDown();
+
+ LOG.error("Publisher.dispatch(source={}): consumer={}: error while sending logs to consumer", this.source, notificationHandlerName, exception);
+
+ throw new NotificationException(exception, String.format("%s: %s: Publisher: Destination down!", this.source, notificationHandlerName));
+ } finally {
+ messages.clear();
+ }
+ }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
new file mode 100644
index 0000000..a9a3a78
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool;
+
+import org.apache.commons.configuration.Configuration;
+
+import java.io.File;
+
+public class SpoolConfiguration {
+ private static final int PROP_RETRY_DESTINATION_MS_DEFAULT = 30000; // Default 30 seconds
+ private static final int PROP_FILE_ROLLOVER_SEC_DEFAULT = 60; // 60 secs
+ private static final int PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT = 100;
+ private static final String PROP_FILE_SPOOL_ARCHIVE_DIR_DEFAULT = "archive";
+ private static final String PROP_FILE_SPOOL_LOCAL_DIR_DEFAULT = "/tmp/spool";
+ private static final int PROP_FILE_MESSAGE_BATCH_SIZE_DEFAULT = 100;
+ private static final String PROPERTY_PREFIX_SPOOL = "atlas.hook.spool.";
+ public static final String PROP_FILE_SPOOL_LOCAL_DIR = PROPERTY_PREFIX_SPOOL + "dir";
+ private static final String PROP_FILE_SPOOL_ARCHIVE_DIR = PROPERTY_PREFIX_SPOOL + "archive.dir";
+ private static final String PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT = PROPERTY_PREFIX_SPOOL + "archive.max.files";
+ public static final String PROP_FILE_SPOOL_FILE_ROLLOVER_SEC = PROPERTY_PREFIX_SPOOL + "file.rollover.sec";
+ public static final String PROP_FILE_SPOOL_DEST_RETRY_MS = PROPERTY_PREFIX_SPOOL + "destination.retry.ms";
+ private static final String PROP_MESSAGE_BATCH_SIZE = PROPERTY_PREFIX_SPOOL + "destination.message.batchsize";
+
+ private final String messageHandlerName;
+ private final int maxArchivedFilesCount;
+ private final int messageBatchSize;
+ private final int retryDestinationMS;
+ private final int fileRollOverSec;
+ private final int fileSpoolMaxFilesCount;
+ private final String spoolDirPath;
+ private final String archiveDir;
+ private String sourceName;
+
+ public SpoolConfiguration(Configuration cfg, String messageHandlerName) {
+ this.messageHandlerName = messageHandlerName;
+ this.maxArchivedFilesCount = cfg.getInt(PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT);
+ this.messageBatchSize = cfg.getInt(PROP_MESSAGE_BATCH_SIZE, PROP_FILE_MESSAGE_BATCH_SIZE_DEFAULT);
+ this.retryDestinationMS = cfg.getInt(PROP_FILE_SPOOL_DEST_RETRY_MS, PROP_RETRY_DESTINATION_MS_DEFAULT);
+ this.fileRollOverSec = cfg.getInt(PROP_FILE_SPOOL_FILE_ROLLOVER_SEC, PROP_FILE_ROLLOVER_SEC_DEFAULT) * 1000;
+ this.fileSpoolMaxFilesCount = cfg.getInt(PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT);
+ this.spoolDirPath = cfg.getString(SpoolConfiguration.PROP_FILE_SPOOL_LOCAL_DIR, PROP_FILE_SPOOL_LOCAL_DIR_DEFAULT);
+ this.archiveDir = cfg.getString(PROP_FILE_SPOOL_ARCHIVE_DIR, new File(getSpoolDirPath(), PROP_FILE_SPOOL_ARCHIVE_DIR_DEFAULT).toString());
+ }
+
+ public void setSource(String val) {
+ this.sourceName = val;
+ }
+
+ public String getSourceName() {
+ return this.sourceName;
+ }
+
+ public int getMaxArchiveFiles() {
+ return maxArchivedFilesCount;
+ }
+
+ public int getRetryDestinationMS() {
+ return retryDestinationMS;
+ }
+
+ public int getFileRolloverSec() {
+ return this.fileRollOverSec;
+ }
+
+ public int getFileSpoolMaxFilesCount() {
+ return fileSpoolMaxFilesCount;
+ }
+
+ public String getSpoolDirPath() {
+ return spoolDirPath;
+ }
+
+ public File getSpoolDir() {
+ return new File(getSpoolDirPath());
+ }
+
+ public File getArchiveDir() {
+ return new File(this.archiveDir);
+ }
+
+ public String getMessageHandlerName() {
+ return this.messageHandlerName;
+ }
+
+ public int getMessageBatchSize() {
+ return messageBatchSize;
+ }
+
+ public File getIndexFile() {
+ String fileName = SpoolUtils.getIndexFileName(getSourceName(), getMessageHandlerName());
+
+ return new File(getSpoolDir(), fileName);
+ }
+
+ public File getIndexDoneFile() {
+ String fileName = SpoolUtils.getIndexFileName(getSourceName(), getMessageHandlerName());
+ String fileDoneName = SpoolUtils.getIndexDoneFile(fileName);
+
+ return new File(getSpoolDir(), fileDoneName);
+ }
+
+ public File getIndexPublishFile() {
+ String fileName = SpoolUtils.getIndexFileName(getSourceName(), getMessageHandlerName());
+ String fileDoneName = SpoolUtils.getIndexPublishFile(fileName);
+
+ return new File(getSpoolDir(), fileDoneName);
+ }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java
new file mode 100644
index 0000000..abbe33d
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool;
+
+import org.apache.atlas.notification.spool.models.IndexRecord;
+import org.apache.atlas.notification.spool.models.IndexRecords;
+import org.apache.atlas.type.AtlasType;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+import java.text.SimpleDateFormat;
+
+public class SpoolUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(SpoolUtils.class);
+
+ public static final String DEFAULT_CHAR_SET = "UTF-8";
+ private static final String DEFAULT_LINE_SEPARATOR = System.getProperty("line.separator");
+ private static final String FILE_EXT_JSON = ".json";
+ public static final String FILE_EXT_LOG = ".log";
+ private static final String SPOOL_FILE_NAME_FORMAT_PREFIX = "%s.%s%s";
+ private static final String INDEX_FILE_CLOSED_SUFFIX = "_closed.json";
+ private static final String INDEX_FILE_PUBLISH_SUFFIX = "_publish.json";
+ private static final String INDEX_FILE_NAME_FORMAT = "index-%s-%s" + FILE_EXT_JSON;
+ private static final String SPOOL_FILE_NAME_FORMAT = "spool-%s-%s-%s" + FILE_EXT_LOG;
+ private static final String RECORD_EMPTY = StringUtils.leftPad(StringUtils.EMPTY, IndexRecord.RECORD_SIZE) + SpoolUtils.getLineSeparator();
+
+ public static File getCreateFile(File file, String source) throws IOException {
+ if (createFileIfNotExists(file, source)) {
+ LOG.info("SpoolUtils.getCreateFile(source={}): file={}", source, file.getAbsolutePath());
+ }
+
+ return file;
+ }
+
+ public static boolean createFileIfNotExists(File file, String source) throws IOException {
+ boolean ret = file.exists();
+
+ if (!ret) {
+ ret = file.createNewFile();
+
+ if (!ret) {
+ LOG.error("SpoolUtils.createFileIfNotExists(source={}): error creating file {}", source, file.getPath());
+
+ ret = false;
+ }
+ }
+
+ return ret;
+ }
+
+ public static File getCreateDirectory(File file) {
+ File ret = file;
+
+ if (!file.isDirectory()) {
+ boolean result = file.mkdirs();
+
+ if (!file.isDirectory() || !result) {
+ LOG.error("SpoolUtils.getCreateDirectory({}): inaccessible!", file.toString());
+
+ ret = null;
+ }
+ }
+
+ return ret;
+ }
+
+ public static PrintWriter createAppendPrintWriter(File filePath) throws UnsupportedEncodingException, FileNotFoundException {
+ return new PrintWriter(
+ new BufferedWriter(
+ new OutputStreamWriter(
+ new FileOutputStream(filePath, true), DEFAULT_CHAR_SET)));
+ }
+
+ public static String getIndexFileName(String source, String handlerName) {
+ return String.format(SpoolUtils.INDEX_FILE_NAME_FORMAT, source, handlerName);
+ }
+
+ public static String getIndexDoneFile(String filePath) {
+ return StringUtils.substringBeforeLast(filePath, SpoolUtils.FILE_EXT_JSON) + SpoolUtils.INDEX_FILE_CLOSED_SUFFIX;
+ }
+
+ public static String getIndexPublishFile(String filePath) {
+ return StringUtils.substringBeforeLast(filePath, SpoolUtils.FILE_EXT_JSON) + SpoolUtils.INDEX_FILE_PUBLISH_SUFFIX;
+ }
+
+ public static boolean fileExists(IndexRecord record) {
+ return record != null && new File(record.getPath()).exists();
+ }
+
+ static String getSpoolFileName(String source, String handlerName, String guid) {
+ return String.format(SPOOL_FILE_NAME_FORMAT, source, handlerName, guid);
+ }
+
+ public static String getSpoolFilePath(SpoolConfiguration cfg, String spoolDir, String archiveFolder, String suffix) {
+ File ret = null;
+ String fileName = getSpoolFileName(cfg.getSourceName(), cfg.getMessageHandlerName(), suffix);
+ int lastDot = StringUtils.lastIndexOf(fileName, '.');
+ String baseName = fileName.substring(0, lastDot);
+ String extension = fileName.substring(lastDot);
+
+ for (int sequence = 1; true; sequence++) {
+ ret = new File(spoolDir, fileName);
+
+ File archiveLogFile = new File(archiveFolder, fileName);
+
+ if (!ret.exists() && !archiveLogFile.exists()) {
+ break;
+ }
+
+ fileName = String.format(SPOOL_FILE_NAME_FORMAT_PREFIX, baseName, sequence, extension);
+ }
+
+ return ret.getPath();
+ }
+
+ public static String getLineSeparator() {
+ return DEFAULT_LINE_SEPARATOR;
+ }
+
+ public static String getRecordForWriting(IndexRecord record) {
+ String json = AtlasType.toJson(record);
+
+ return StringUtils.rightPad(json, IndexRecord.RECORD_SIZE) + SpoolUtils.getLineSeparator();
+ }
+
+ public static String getEmptyRecordForWriting() {
+ return RECORD_EMPTY;
+ }
+
+ public static IndexRecords createRecords(String[] items) {
+ IndexRecords records = new IndexRecords();
+
+ if (items != null && items.length > 0) {
+ try {
+ for (String item : items) {
+ if (StringUtils.isNotBlank(item)) {
+ IndexRecord record = AtlasType.fromJson(item, IndexRecord.class);
+
+ records.getRecords().put(record.getId(), record);
+ }
+ }
+ } catch (Exception ex) {
+ LOG.error("SpoolUtils.createRecords(): error loading records.", ex);
+ }
+ }
+
+ return records;
+ }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java b/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java
new file mode 100644
index 0000000..2cacaaa
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.hook.FailedMessagesLogger;
+import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutput;
+import java.io.PrintWriter;
+import java.util.List;
+
+public class Spooler extends AbstractNotification {
+ private static final Logger LOG = LoggerFactory.getLogger(Spooler.class);
+
+ private final SpoolConfiguration configuration;
+ private final IndexManagement indexManagement;
+ private FailedMessagesLogger failedMessagesLogger;
+ private boolean isDrain;
+
+ public Spooler(SpoolConfiguration configuration, IndexManagement indexManagement) {
+ this.configuration = configuration;
+ this.indexManagement = indexManagement;
+ }
+
+ public void setFailedMessagesLogger(FailedMessagesLogger failedMessagesLogger) {
+ this.failedMessagesLogger = failedMessagesLogger;
+ }
+
+ public void setDrain() {
+ this.isDrain = true;
+ }
+
+ @Override
+ public <T> List<NotificationConsumer<T>> createConsumers(org.apache.atlas.notification.NotificationInterface.NotificationType notificationType, int numConsumers) {
+ return null;
+ }
+
+ @Override
+ public void sendInternal(NotificationType type, List<String> messages) {
+ boolean ret = write(messages);
+
+ if (failedMessagesLogger != null && !ret) {
+ writeToFailedMessages(messages);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @VisibleForTesting
+ boolean write(List<String> messages) {
+ final boolean ret;
+
+ try {
+ if (!getDrain()) {
+ indexManagement.setSpoolWriteInProgress();
+
+ ret = writeInternal(messages);
+ } else {
+ LOG.error("Spooler.write(source={}): called after stop is called! Write will not be performed!", configuration.getSourceName(), messages);
+
+ ret = false;
+ }
+ } finally {
+ indexManagement.resetSpoolWriteInProgress();
+ }
+
+ return ret;
+ }
+
+ private void writeToFailedMessages(List<String> messages) {
+ if (failedMessagesLogger != null) {
+ for (String message : messages) {
+ failedMessagesLogger.log(message);
+ }
+ }
+ }
+
+ private boolean writeInternal(List<String> messages) {
+ boolean ret = false;
+
+ try {
+ byte[] lineSeparatorBytes = SpoolUtils.getLineSeparator().getBytes(SpoolUtils.DEFAULT_CHAR_SET);
+ DataOutput pw = indexManagement.getSpoolWriter();
+
+ for (String message : messages) {
+ pw.write(message.getBytes(SpoolUtils.DEFAULT_CHAR_SET));
+ pw.write(lineSeparatorBytes);
+ }
+
+ indexManagement.flushSpoolWriter();
+
+ ret = true;
+ } catch (Exception exception) {
+ LOG.error("Spooler.writeInternal(source={}): error writing to file. messages={}", configuration.getSourceName(), messages, exception);
+
+ ret = false;
+ }
+
+ return ret;
+ }
+
+ private boolean getDrain() {
+ return this.isDrain;
+ }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/models/IndexRecord.java b/notification/src/main/java/org/apache/atlas/notification/spool/models/IndexRecord.java
new file mode 100644
index 0000000..21dad06
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/models/IndexRecord.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool.models;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.atlas.type.AtlasType;
+import org.apache.commons.lang.StringUtils;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+import java.util.UUID;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class IndexRecord implements Serializable {
+ public static final int RECORD_SIZE = 500;
+ public static final String STATUS_PENDING = "PENDING";
+ public static final String STATUS_WRITE_IN_PROGRESS = "WRITE_IN_PROGRESS";
+ public static final String STATUS_READ_IN_PROGRESS = "READ_IN_PROGRESS";
+ public static final String STATUS_DONE = "DONE";
+
+ private String id;
+ private String path;
+ private int line;
+ private long created;
+ private long writeCompleted;
+ private long doneCompleted;
+ private long lastSuccess;
+ private long lastFailed;
+ private boolean lastAttempt;
+ private int failedAttempt;
+ private String status;
+
+ public IndexRecord() {
+ this.status = STATUS_WRITE_IN_PROGRESS;
+ this.lastAttempt = false;
+ }
+
+ public IndexRecord(String path) {
+ this.id = UUID.randomUUID().toString();
+ this.path = path;
+ this.failedAttempt = 0;
+ this.status = STATUS_WRITE_IN_PROGRESS;
+ this.created = System.currentTimeMillis();
+
+ setLastAttempt(false);
+ }
+
+ @Override
+ public String toString() {
+ return "IndexRecord [id=" + id + ", filePath=" + path
+ + ", linePosition=" + line + ", status=" + status
+ + ", fileCreateTime=" + created
+ + ", writeCompleteTime=" + writeCompleted
+ + ", doneCompleteTime=" + doneCompleted
+ + ", lastSuccessTime=" + lastSuccess
+ + ", lastFailedTime=" + lastFailed
+ + ", failedAttemptCount=" + failedAttempt
+ + ", lastAttempt=" + lastAttempt + "]";
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getId() {
+ return this.id;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return this.path;
+ }
+
+ public void setLine(int line) {
+ this.line = line;
+ }
+
+ public int getLine() {
+ return line;
+ }
+
+ public void setCreated(long fileCreateTime) {
+ this.created = fileCreateTime;
+ }
+
+ public long getCreated() {
+ return this.created;
+ }
+
+ public void setWriteCompleted(long writeCompleted) {
+ this.writeCompleted = writeCompleted;
+ }
+
+ public long getWriteCompleted() {
+ return this.writeCompleted;
+ }
+
+ public void setDoneCompleted(long doneCompleted) {
+ this.doneCompleted = doneCompleted;
+ }
+
+ public long getDoneCompleted() {
+ return doneCompleted;
+ }
+
+ public void setLastSuccess(long lastSuccess) {
+ this.lastSuccess = lastSuccess;
+ }
+
+ public long getLastSuccess() {
+ return lastSuccess;
+ }
+
+ public void setLastFailed(long lastFailed) {
+ this.lastFailed = lastFailed;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public String getStatus() {
+ return this.status;
+ }
+
+ public void setLastAttempt(boolean lastAttempt) {
+ this.lastAttempt = lastAttempt;
+ }
+
+ public void setFailedAttempt(int failedAttempt) {
+ this.failedAttempt = failedAttempt;
+ }
+
+ public int getFailedAttempt() {
+ return failedAttempt;
+ }
+
+ @JsonIgnore
+ public void setDone() {
+ setStatus(IndexRecord.STATUS_DONE);
+ setDoneCompleted(System.currentTimeMillis());
+ setLastAttempt(true);
+ }
+
+ @JsonIgnore
+ public void setStatusPending() {
+ setStatus(IndexRecord.STATUS_PENDING);
+ setWriteCompleted(System.currentTimeMillis());
+ setLastAttempt(true);
+ }
+
+ @JsonIgnore
+ public void updateFailedAttempt() {
+ setLastFailed(System.currentTimeMillis());
+ incrementFailedAttemptCount();
+ setLastAttempt(false);
+ }
+
+ @JsonIgnore
+ public boolean equals(IndexRecord record) {
+ return this.id.equals(record.getId());
+ }
+
+ @JsonIgnore
+ public void setCurrentLine(int line) {
+ setLine(line);
+ setStatus(STATUS_READ_IN_PROGRESS);
+ setLastSuccess(System.currentTimeMillis());
+ setLastAttempt(true);
+ }
+
+ @JsonIgnore
+ public boolean isStatusDone() {
+ return this.status.equals(STATUS_DONE);
+ }
+
+ @JsonIgnore
+ public boolean isStatusWriteInProgress() {
+ return this.status.equals(STATUS_WRITE_IN_PROGRESS);
+ }
+
+ @JsonIgnore
+ public boolean isStatusReadInProgress() {
+ return status.equals(IndexRecord.STATUS_READ_IN_PROGRESS);
+ }
+
+ @JsonIgnore
+ public void incrementFailedAttemptCount() {
+ this.failedAttempt++;
+ }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/models/IndexRecords.java b/notification/src/main/java/org/apache/atlas/notification/spool/models/IndexRecords.java
new file mode 100644
index 0000000..abcb837
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/models/IndexRecords.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool.models;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class IndexRecords implements Serializable {
+ private LinkedHashMap<String, IndexRecord> records;
+
+ public IndexRecords() {
+ this.records = new LinkedHashMap<>();
+ }
+
+ public Map<String, IndexRecord> getRecords() {
+ return records;
+ }
+
+ public void setRecords(LinkedHashMap<String, IndexRecord> records) {
+ this.records = records;
+ }
+
+ @JsonIgnore
+ public int size() {
+ LinkedHashMap<String, IndexRecord> records = this.records;
+
+ return records != null ? records.size() : 0;
+ }
+
+ @JsonIgnore
+ public void remove(IndexRecord record) {
+ LinkedHashMap<String, IndexRecord> records = this.records;
+
+ if (records != null) {
+ records.remove(record.getId());
+ }
+ }
+
+ @JsonIgnore
+ public void add(IndexRecord record) {
+ LinkedHashMap<String, IndexRecord> records = this.records;
+
+ if (records == null) {
+ records = new LinkedHashMap<>();
+
+ this.records = records;
+ }
+
+ records.put(record.getId(), record);
+ }
+
+ @JsonIgnore
+ public void delete(IndexRecord record) {
+ remove(record);
+ }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileLockedReadWrite.java b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileLockedReadWrite.java
new file mode 100644
index 0000000..5d5ad8c
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileLockedReadWrite.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool.utils.local;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+
+public class FileLockedReadWrite extends FileOperation {
+ private RandomAccessFile raf;
+ private FileChannel channel;
+ private FileLock lock;
+
+ public FileLockedReadWrite(String source) {
+ super(source);
+ }
+
+ @Override
+ public FileLock run(RandomAccessFile randomAccessFile, FileChannel channel, String json) throws IOException {
+ this.raf = randomAccessFile;
+ this.channel = channel;
+ this.lock = channel.tryLock();
+
+ return lock;
+ }
+
+ public DataInput getInput(File file) throws IOException {
+ return getRaf(file);
+ }
+
+ public DataOutput getOutput(File file) throws IOException {
+ return getRaf(file);
+ }
+
+ public void flush() throws IOException {
+ if (channel != null) {
+ channel.force(true);
+ }
+ }
+
+ public void close() {
+ super.close(this.raf, this.channel, this.lock);
+ }
+
+ private RandomAccessFile getRaf(File file) throws IOException {
+ RandomAccessFile raf = new RandomAccessFile(file, "rws");
+
+ run(raf, raf.getChannel(), StringUtils.EMPTY);
+
+ return raf;
+ }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationException.java b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpAppend.java
similarity index 56%
copy from notification/src/main/java/org/apache/atlas/notification/NotificationException.java
copy to notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpAppend.java
index 2dd9c9f..bfc113d 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpAppend.java
@@ -15,28 +15,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.atlas.notification;
+package org.apache.atlas.notification.spool.utils.local;
-import org.apache.atlas.AtlasException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
-import java.util.List;
+public class FileOpAppend extends FileOperation {
-/**
- * Exception from notification.
- */
-public class NotificationException extends AtlasException {
- private List<String> failedMessages;
-
- public NotificationException(Exception e) {
- super(e);
+ public FileOpAppend(String source) {
+ super(source);
}
- public NotificationException(Exception e, List<String> failedMessages) {
- super(e);
- this.failedMessages = failedMessages;
- }
+ @Override
+ public FileLock run(RandomAccessFile randomAccessFile, FileChannel channel, String json) throws IOException {
+ FileLock lock = channel.tryLock(randomAccessFile.length(), json.length(), false);
+
+ channel.position(randomAccessFile.length());
+
+ randomAccessFile.writeBytes(json);
- public List<String> getFailedMessages() {
- return failedMessages;
+ return lock;
}
}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpCompaction.java b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpCompaction.java
new file mode 100644
index 0000000..8240a80
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpCompaction.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool.utils.local;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+
+public class FileOpCompaction extends FileOperation {
+ private final FileOpRead fileOpLoad;
+
+ public FileOpCompaction(String source) {
+ super(source);
+
+ this.fileOpLoad = new FileOpRead(source);
+ }
+
+ @Override
+ public FileLock run(RandomAccessFile file, FileChannel channel, String json) throws IOException {
+ FileLock lock = file.getChannel().tryLock();
+
+ fileOpLoad.perform(getFile(), StringUtils.EMPTY);
+
+ file.getChannel().truncate(0);
+
+ String[] rawItems = fileOpLoad.getItems();
+
+ if (rawItems != null) {
+ for (String record : rawItems) {
+ if (StringUtils.isNotBlank(record)) {
+ file.writeBytes(record);
+ }
+ }
+ }
+
+ return lock;
+ }
+}
diff --git a/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpDelete.java
similarity index 52%
copy from notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java
copy to notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpDelete.java
index 2dd970e..19243a6 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpDelete.java
@@ -15,27 +15,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.atlas.kafka;
+package org.apache.atlas.notification.spool.utils.local;
-import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasException;
-import org.apache.commons.configuration.Configuration;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
-/**
- * Provider class for Notification interfaces
- */
-public class NotificationProvider {
- private static KafkaNotification kafka;
+public class FileOpDelete extends FileOperation {
+ public FileOpDelete(String source) {
+ super(source);
+ }
+
+ @Override
+ public FileLock run(RandomAccessFile file, FileChannel channel, String json) throws IOException {
+ final FileLock ret;
+ final long position = find(file, getId());
- public static KafkaNotification get() {
- if (kafka == null) {
- try {
- Configuration applicationProperties = ApplicationProperties.get();
- kafka = new KafkaNotification(applicationProperties);
- } catch (AtlasException e) {
- throw new RuntimeException(e);
- }
+ if (position < 0) {
+ ret = null;
+ } else {
+ ret = channel.tryLock(position, json.length(), false);
+
+ channel.position(position);
+
+ file.writeBytes(json);
}
- return kafka;
+
+ return ret;
}
}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpRead.java b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpRead.java
new file mode 100644
index 0000000..b228090
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpRead.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool.utils.local;
+
+import org.apache.atlas.notification.spool.SpoolUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+
+public class FileOpRead extends FileOperation {
+ private static final Logger LOG = LoggerFactory.getLogger(FileOpRead.class);
+
+ private String[] items;
+
+ public FileOpRead(String source) {
+ super(source);
+ }
+
+ @Override
+ public FileLock run(RandomAccessFile randomAccessFile, FileChannel channel, String json) throws IOException {
+ items = null;
+
+ byte[] bytes = new byte[(int) randomAccessFile.length()];
+
+ randomAccessFile.readFully(bytes);
+
+ int rawRecords = 0;
+ String allRecords = new String(bytes);
+
+ if (StringUtils.isNotEmpty(allRecords)) {
+ items = StringUtils.split(allRecords, SpoolUtils.getLineSeparator());
+
+ if (items != null) {
+ rawRecords = items.length;
+ }
+ }
+
+ LOG.info("FileOpRead.run(source={}): loaded file {}, raw records={}", this.getSource(), this.getFile().getAbsolutePath(), rawRecords);
+
+ return null;
+ }
+
+ public String[] getItems() {
+ return items;
+ }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpUpdate.java b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpUpdate.java
new file mode 100644
index 0000000..cd58e86
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpUpdate.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool.utils.local;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+
+public class FileOpUpdate extends FileOperation {
+ private String id;
+ private final FileOpAppend fileOpAppend;
+
+ public FileOpUpdate(String source, FileOpAppend fileOpAppend) {
+ super(source);
+
+ this.fileOpAppend = fileOpAppend;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public FileLock run(RandomAccessFile file, FileChannel channel, String json) throws IOException {
+ final FileLock ret;
+ final long position = find(file, getId());
+
+ if (position < 0) {
+ ret = fileOpAppend.run(file, channel, json);
+ } else {
+ ret = channel.tryLock(position, json.length(), false);
+
+ channel.position(position);
+
+ file.writeBytes(json);
+ }
+
+ return ret;
+ }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOperation.java b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOperation.java
new file mode 100644
index 0000000..e5bf9d2
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOperation.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool.utils.local;
+
+import org.apache.atlas.notification.spool.SpoolUtils;
+import org.apache.atlas.notification.spool.models.IndexRecord;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.math.RandomUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
+import java.util.concurrent.TimeUnit;
+
+public abstract class FileOperation {
+ private static final Logger LOG = LoggerFactory.getLogger(FileOperation.class);
+
+ private static final int MAX_RETRY_ATTEMPTS = 5;
+ private static final String RANDOM_ACCESS_FILE_OPEN_MODE_RWS = "rws";
+ private static final String RANDOM_ACCESS_FILE_OPEN_MODE_R = "r";
+
+ private final String source;
+ private File file;
+ private String id;
+
+ public static RandomAccessFile createRandomAccessFileForRead(File file) throws FileNotFoundException {
+ return new RandomAccessFile(file, RANDOM_ACCESS_FILE_OPEN_MODE_R);
+ }
+
+ public static RandomAccessFile createRandomAccessFile(File file) throws FileNotFoundException {
+ return new RandomAccessFile(file, RANDOM_ACCESS_FILE_OPEN_MODE_RWS);
+ }
+
+ public static long find(RandomAccessFile raf, String id) throws IOException {
+ while (true) {
+ String line = raf.readLine();
+
+ if (StringUtils.isEmpty(line)) {
+ break;
+ }
+
+ if (line.contains(id)) {
+ return raf.getChannel().position() - SpoolUtils.getLineSeparator().length() - IndexRecord.RECORD_SIZE;
+ }
+ }
+
+ return -1;
+ }
+
+ public FileOperation(String source) {
+ this(source, false);
+ }
+
+ public FileOperation(String source, boolean notifyConcurrency) {
+ this.source = source;
+ }
+
+ public String getSource() {
+ return source;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public void perform(File file) {
+ perform(file, StringUtils.EMPTY);
+ }
+
+ public void perform(File file, String json) {
+ setFile(file);
+
+ performWithRetry(file, json);
+ }
+
+ public void perform(File file, String id, String json) {
+ this.setId(id);
+ perform(file, json);
+ }
+
+ public abstract FileLock run(RandomAccessFile randomAccessFile, FileChannel channel, String json) throws IOException;
+
+
+ protected File getFile() {
+ return this.file;
+ }
+
+ protected String getId() {
+ return this.id;
+ }
+
+ protected void close(RandomAccessFile randomAccessFile, FileChannel channel, FileLock lock) {
+ try {
+ if (channel != null) {
+ channel.force(true);
+ }
+
+ if (lock != null) {
+ lock.release();
+ }
+
+ if (channel != null) {
+ channel.close();
+ }
+
+ if (randomAccessFile != null) {
+ randomAccessFile.close();
+ }
+ } catch (IOException exception) {
+ LOG.error("FileOperation(source={}).close(): failed", getSource(), exception);
+ }
+ }
+
+
+ private void setFile(File file) {
+ this.file = file;
+ }
+
+ private void performWithRetry(File file, String json) {
+ for (int i = 0; i < MAX_RETRY_ATTEMPTS; i++) {
+ try {
+ performOperation(json);
+ return;
+ } catch (OverlappingFileLockException e) {
+ try {
+ int timeout = 1 + (50 * RandomUtils.nextInt(10));
+ LOG.info("FileOperation.performWithRetry(source={}): {}: {}: Waiting: {} ms...", getSource(), getClass().getSimpleName(), file.getName(), timeout);
+
+ TimeUnit.MILLISECONDS.sleep(timeout);
+ } catch (InterruptedException ex) {
+ LOG.error("FileOperation.performWithRetry(source={}): {}: Interrupted!", getSource(), file.getAbsolutePath(), ex);
+ }
+
+ LOG.info("FileOperation.performWithRetry(source={}): {}: Re-trying: {}!", getSource(), file.getAbsolutePath(), i);
+ }
+ }
+
+ LOG.info("FileOperation.performWithRetry(source={}): {}: appendRecord: Could not write.", getSource(), file.getAbsolutePath());
+ }
+
+ private boolean performOperation(String json) {
+ RandomAccessFile randomAccessFile = null;
+ FileChannel channel = null;
+ FileLock lock = null;
+
+ try {
+ randomAccessFile = new RandomAccessFile(getFile(), "rws");
+ channel = randomAccessFile.getChannel();
+ lock = run(randomAccessFile, channel, json);
+ } catch (FileNotFoundException e) {
+ LOG.error("FileOperation.performOperation(source={}): file={}: file not found", getSource(), getFile().getAbsolutePath(), e);
+ } catch (IOException exception) {
+ LOG.error("FileOperation.performOperation(source={}): file={}: failed", getSource(), getFile().getAbsolutePath());
+ } finally {
+ close(randomAccessFile, channel, lock);
+ }
+
+ return true;
+ }
+}
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
index 94cb70d..d7e4959 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
@@ -112,7 +112,7 @@ public class AbstractNotificationTest {
}
@Override
- protected void sendInternal(NotificationType notificationType, List<String> notificationMessages)
+ public void sendInternal(NotificationType notificationType, List<String> notificationMessages)
throws NotificationException {
type = notificationType;
diff --git a/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java b/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java
new file mode 100644
index 0000000..167efbe
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationException;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.atlas.notification.NotificationInterface.NotificationType.HOOK;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+public class AtlasFileSpoolTest extends BaseTest {
+ private static int MAX_RECORDS = 50;
+
+ private static class MessageHandlerSpy extends AbstractNotification {
+
+ private List<String> publishedMessages = new ArrayList<>();
+
+ public List<String> getMessages() {
+ return publishedMessages;
+ }
+
+ @Override
+ public void init(String source, Object failedMessagesLogger) {
+ }
+
+ @Override
+ public void setCurrentUser(String user) {
+
+ }
+
+ @Override
+ public void sendInternal(NotificationType type, List<String> messages) throws NotificationException {
+ publishedMessages.addAll(messages);
+
+ }
+
+ @Override
+ public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) {
+ return null;
+ }
+
+ @Override
+ public <T> void send(NotificationType type, T... messages) throws NotificationException {
+ }
+
+ @Override
+ public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
+
+ @Test
+ public void indexSetupMultipleTimes() throws IOException, AtlasException {
+ SpoolConfiguration cfg = getSpoolConfiguration();
+ IndexManagement indexManagement = new IndexManagement(cfg);
+
+ for (int i = 0; i < 2; i++) {
+ indexManagement.init();
+ assertTrue(cfg.getSpoolDir().exists());
+ assertTrue(cfg.getArchiveDir().exists());
+
+ File indexFile = indexManagement.getIndexFileManager().getIndexFile();
+ File indexDoneFile = indexManagement.getIndexFileManager().getDoneFile();
+
+ assertTrue(indexFile.exists(), "File not created: " + indexFile.getAbsolutePath());
+ assertTrue(indexDoneFile.exists(), "File not created: " + indexDoneFile.getAbsolutePath());
+ }
+ }
+
+ @Test
+ public void spoolerTest() throws IOException, AtlasException {
+ SpoolConfiguration cfg = getSpoolConfigurationTest();
+ IndexManagement indexManagement = new IndexManagement(cfg);
+
+ indexManagement.init();
+ Spooler spooler = new Spooler(cfg, indexManagement);
+ for (int i = 0; i < MAX_RECORDS; i++) {
+ spooler.write(Collections.singletonList("message: " + i));
+ }
+
+ indexManagement.stop();
+ }
+
+ @Test(dependsOnMethods = "spoolerTest")
+ public void publisherTest() throws IOException, AtlasException, InterruptedException {
+ SpoolConfiguration cfg = getSpoolConfigurationTest();
+
+ IndexManagement indexManagement = new IndexManagement(cfg);
+ indexManagement.init();
+
+ MessageHandlerSpy messageHandler = new MessageHandlerSpy();
+ Publisher publisher = new Publisher(cfg, indexManagement, messageHandler);
+ boolean ret = publisher.processAndDispatch(indexManagement.getIndexFileManager().getRecords().get(0));
+
+ publisher.setDrain();
+ Assert.assertTrue(ret);
+ TimeUnit.SECONDS.sleep(5);
+ assertTrue(messageHandler.getMessages().size() >= 0);
+ }
+
+ @Test
+ public void indexRecordsRead() throws IOException, AtlasException {
+ SpoolConfiguration spoolCfg = getSpoolConfigurationTest();
+ IndexManagement indexManagement = new IndexManagement(spoolCfg);
+ indexManagement.init();
+
+ }
+
+ @Test
+ public void concurrentWriteAndPublish() throws InterruptedException, IOException, AtlasException {
+ final int MAX_PROCESSES = 4;
+ SpoolConfiguration spoolCfg = getSpoolConfigurationTest(5);
+
+ IndexManagement[] im1 = new IndexManagement[MAX_PROCESSES];
+ MessageHandlerSpy[] messageHandlerSpy = new MessageHandlerSpy[MAX_PROCESSES];
+
+ for (int i = 0; i < MAX_PROCESSES; i++) {
+ messageHandlerSpy[i] = new MessageHandlerSpy();
+ im1[i] = new IndexManagement(spoolCfg);
+ }
+
+ for (int i = 0; i < MAX_PROCESSES; i++) {
+ im1[i].init();
+ }
+
+ IndexManagement imVerify = new IndexManagement(spoolCfg);
+ imVerify.init();
+ Assert.assertTrue(imVerify.getIndexFileManager().getRecords().size() >= 0);
+
+ Thread[] th1 = new Thread[MAX_PROCESSES];
+ for (int i = 0; i < MAX_PROCESSES; i++) {
+ th1[i] = new Thread(new MessagePump(new Spooler(spoolCfg, im1[i]), new Publisher(spoolCfg, im1[i], messageHandlerSpy[i])));
+ }
+
+ for (int i = 0; i < MAX_PROCESSES; i++) {
+ th1[i].start();
+ }
+
+ for (int i = 0; i < MAX_PROCESSES; i++) {
+ th1[i].join();
+ }
+
+ imVerify = new IndexManagement(spoolCfg);
+ imVerify.init();
+ Assert.assertEquals(imVerify.getIndexFileManager().getRecords().size(), 0);
+ for (int i = 0; i < MAX_PROCESSES; i++) {
+ Assert.assertTrue(messageHandlerSpy[i].getMessages().size() >= 0);
+ }
+ }
+
+ private class MessagePump implements Runnable {
+
+ private Spooler spooler;
+ private Publisher publisher;
+ private Thread publisherThread;
+
+ public MessagePump(Spooler spooler, Publisher publisher) {
+ this.spooler = spooler;
+ this.publisher = publisher;
+ }
+
+ @Override
+ public void run() {
+ publisherThread = new Thread(publisher);
+ publisherThread.start();
+
+ for (int i = 0; i < MAX_RECORDS; i++) {
+ try {
+ spooler.send(HOOK, String.format("%s-%s", "message", i));
+
+ Thread.sleep(RandomUtils.nextInt(10, 100));
+ } catch (NotificationException exception) {
+ exception.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ try {
+ Thread.sleep(10000);
+ publisher.setDrain();
+ publisherThread.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+
+ @AfterClass
+ public void tearDown() {
+ FileUtils.deleteQuietly(new File(spoolDirTest));
+ }
+}
diff --git a/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java b/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java
new file mode 100644
index 0000000..7ca745f
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.io.IOException;
+
+public class BaseTest {
+ public static String spoolDir = System.getProperty("user.dir") + "/src/test/resources/spool";
+ public static String spoolDirTest = spoolDir + "-test";
+ protected final String SOURCE_TEST = "test-src";
+ protected final String SOURCE_TEST_HANDLER = "1";
+
+ protected final String knownIndexFilePath = "index-test-src-1.json";
+ protected final String knownIndexDoneFilePath = "index-test-src-1_closed.json";
+
+ protected File archiveDir = new File(spoolDir, "archive");
+ protected File indexFile = new File(spoolDir, knownIndexFilePath);
+ protected File indexDoneFile = new File(spoolDir, knownIndexDoneFilePath);
+
+ public SpoolConfiguration getSpoolConfiguration() {
+ return getSpoolConfiguration(spoolDir, SOURCE_TEST_HANDLER);
+ }
+
+ public SpoolConfiguration getSpoolConfigurationTest() {
+ return getSpoolConfiguration(spoolDirTest, SOURCE_TEST_HANDLER);
+ }
+ public SpoolConfiguration getSpoolConfigurationTest(Integer testId) {
+ return getSpoolConfiguration(spoolDirTest, testId.toString());
+ }
+
+ public SpoolConfiguration getSpoolConfiguration(String spoolDir, String handlerName) {
+ SpoolConfiguration cfg = new SpoolConfiguration(getConfiguration(spoolDir), handlerName);
+ cfg.setSource(SOURCE_TEST);
+ return cfg;
+ }
+
+ public Configuration getConfiguration(String spoolDir) {
+ final int destinationRetry = 2000;
+
+ PropertiesConfiguration props = new PropertiesConfiguration();
+ props.setProperty(SpoolConfiguration.PROP_FILE_SPOOL_LOCAL_DIR, spoolDir);
+ props.setProperty(SpoolConfiguration.PROP_FILE_SPOOL_DEST_RETRY_MS, Integer.toString(destinationRetry));
+ props.setProperty(SpoolConfiguration.PROP_FILE_SPOOL_FILE_ROLLOVER_SEC, Integer.toString(2));
+ return props;
+ }
+
+ protected File getNewIndexFile(char id) throws IOException {
+ File f = new File(spoolDirTest, knownIndexFilePath.replace('1', id));
+ FileUtils.copyFile(indexFile, f);
+ return f;
+ }
+
+ protected File getNewIndexDoneFile(char id) throws IOException {
+ File f = new File(spoolDirTest, knownIndexDoneFilePath.replace('1', id));
+ FileUtils.copyFile(indexDoneFile, f);
+ return f;
+ }
+}
diff --git a/notification/src/test/java/org/apache/atlas/notification/spool/IndexManagementTest.java b/notification/src/test/java/org/apache/atlas/notification/spool/IndexManagementTest.java
new file mode 100644
index 0000000..f9d2a06
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/spool/IndexManagementTest.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool;
+
+import org.apache.atlas.notification.spool.models.IndexRecord;
+import org.apache.atlas.notification.spool.models.IndexRecords;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class IndexManagementTest extends BaseTest {
+ @Test
+ public void fileNameGeneration() {
+ String handlerName = "someHandler";
+ SpoolConfiguration cfg = getSpoolConfiguration(spoolDir, handlerName);
+
+ IndexRecord record = new IndexRecord(StringUtils.EMPTY);
+ Assert.assertEquals(SpoolUtils.getIndexFileName(cfg.getSourceName(), cfg.getMessageHandlerName()), "index-test-src-someHandler.json");
+ Assert.assertTrue(SpoolUtils.getSpoolFileName(cfg.getSourceName(), cfg.getMessageHandlerName(), record.getId()).startsWith("spool-test-src-someHandler-"));
+ }
+
+ @Test
+ public void verifyLoad() throws IOException {
+ final int expectedRecords = 2;
+ SpoolConfiguration cfg = getSpoolConfiguration();
+
+ IndexManagement.IndexFileManager indexFileManager = new IndexManagement.IndexFileManager(SOURCE_TEST, cfg.getIndexFile(), cfg.getIndexDoneFile(), null, 2);
+
+ Assert.assertEquals(indexFileManager.getRecords().size(), expectedRecords);
+
+ Assert.assertEquals(indexFileManager.getRecords().get(0).getId(), "1");
+ Assert.assertEquals(indexFileManager.getRecords().get(1).getId(), "2");
+ }
+
+ @Test
+ public void addAndRemove() throws IOException {
+ File newIndexFile = getNewIndexFile('3');
+ File newIndexDoneFile = getNewIndexDoneFile('3');
+
+ IndexManagement.IndexFileManager indexFileManager = new IndexManagement.IndexFileManager(SOURCE_TEST, newIndexFile, newIndexDoneFile, null, 2);
+
+ int expectedCount = 2;
+ Assert.assertEquals(indexFileManager.getRecords().size(), expectedCount);
+
+ IndexRecord r3 = indexFileManager.add("3.log");
+ IndexRecord r4 = indexFileManager.add("4.log");
+ IndexRecord r5 = indexFileManager.add("5.log");
+
+ r4.updateFailedAttempt();
+ indexFileManager.updateIndex(r4);
+
+ r5.setLine(100);
+ indexFileManager.updateIndex(r5);
+
+ IndexRecords records = indexFileManager.loadRecords(newIndexFile);
+ Assert.assertTrue(records.getRecords().containsKey(r3.getId()));
+ Assert.assertTrue(records.getRecords().containsKey(r4.getId()));
+ Assert.assertTrue(records.getRecords().containsKey(r5.getId()));
+
+ Assert.assertEquals(records.getRecords().get(r3.getId()).getStatus(), r3.getStatus());
+ Assert.assertEquals(records.getRecords().get(r4.getId()).getFailedAttempt(), r4.getFailedAttempt());
+ Assert.assertEquals(records.getRecords().get(r5.getId()).getLine(), r5.getLine());
+
+ indexFileManager.remove(r3);
+ indexFileManager.remove(r4);
+ indexFileManager.remove(r5);
+
+ Assert.assertEquals(indexFileManager.getRecords().size(), expectedCount);
+ }
+
+ @Test
+ public void verifyOperations() throws IOException {
+ SpoolConfiguration cfg = getSpoolConfigurationTest();
+
+ File newIndexFile = getNewIndexFile('2');
+ File newIndexDoneFile = getNewIndexDoneFile('2');
+
+ File archiveDir = cfg.getArchiveDir();
+ IndexManagement.IndexFileManager indexFileManager = new IndexManagement.IndexFileManager(SOURCE_TEST, newIndexFile, newIndexDoneFile, null, 2);
+
+ verifyAdding(indexFileManager);
+ verifySaveAndLoad(indexFileManager);
+ verifyRemove(indexFileManager);
+ verifyRecords(indexFileManager);
+
+ checkDoneFile(newIndexDoneFile, archiveDir, 2, "5.log");
+
+ verifyArchiving(indexFileManager);
+ }
+
+ private void verifyRecords(IndexManagement.IndexFileManager indexFileManager) {
+ List<IndexRecord> records = indexFileManager.getRecords();
+
+ Assert.assertEquals(records.size(), 5);
+ Assert.assertTrue(records.get(3).getPath().endsWith("3.log"));
+ Assert.assertEquals(records.get(3).getStatus(), IndexRecord.STATUS_WRITE_IN_PROGRESS);
+ Assert.assertEquals(records.get(2).getFailedAttempt(), 0);
+ Assert.assertEquals(records.get(1).getDoneCompleted(), 0);
+ Assert.assertEquals(records.get(0).getLine(), 0);
+ Assert.assertFalse(records.get(0).getLastSuccess() != 0);
+ }
+
+ private void verifyAdding(IndexManagement.IndexFileManager indexFileManager) throws IOException {
+ addFile(indexFileManager, spoolDirTest, "2.log");
+ addFile(indexFileManager, spoolDirTest, "3.log");
+ addFile(indexFileManager, spoolDirTest, "4.log");
+ addFile(indexFileManager, spoolDirTest, "5.log");
+ }
+
+ private void verifyArchiving(IndexManagement.IndexFileManager indexFileManager) {
+ indexFileManager.remove(indexFileManager.getRecords().get(1));
+ indexFileManager.remove(indexFileManager.getRecords().get(1));
+ indexFileManager.remove(indexFileManager.getRecords().get(1));
+ indexFileManager.remove(indexFileManager.getRecords().get(1));
+
+ checkArchiveDir(archiveDir);
+ }
+
+ private void verifyRemove(IndexManagement.IndexFileManager indexFileManager) throws IOException {
+ indexFileManager.remove(indexFileManager.getRecords().get(5));
+
+ boolean isPending = indexFileManager.getRecords().size() > 0;
+ Assert.assertTrue(isPending);
+ }
+
+ private void verifySaveAndLoad(IndexManagement.IndexFileManager indexFileManager) throws IOException {
+ indexFileManager.getRecords().get(2).updateFailedAttempt();
+ indexFileManager.getRecords().get(3).setDone();
+ indexFileManager.getRecords().get(1).setDoneCompleted(333l);
+ indexFileManager.getRecords().get(0).setCurrentLine(999);
+
+ Assert.assertEquals(indexFileManager.getRecords().size(), 6);
+ }
+
+ private void checkArchiveDir(File archiveDir) {
+ Set<String> availableFiles = new HashSet<>();
+ availableFiles.add(new File(archiveDir, "3.log").toString());
+ availableFiles.add(new File(archiveDir, "4.log").toString());
+
+ if (!archiveDir.exists()) {
+ return;
+ }
+
+ File[] files = archiveDir.listFiles();
+ Assert.assertNotNull(files);
+ Assert.assertEquals(files.length, 1);
+ }
+
+ private void addFile(IndexManagement.IndexFileManager indexFileManager, String dir, String fileName) throws IOException {
+ File file = new File(dir, fileName);
+ file.createNewFile();
+ indexFileManager.add(file.toString());
+ }
+
+ private void checkDoneFile(File newIndexDoneFile, File archiveDir, int maxArchiveFiles, String expectedFilePath) throws IOException {
+ IndexManagement.IndexFileManager indexFileManager = new IndexManagement.IndexFileManager(SOURCE_TEST, newIndexDoneFile, newIndexDoneFile, null, maxArchiveFiles);
+
+ Assert.assertEquals(indexFileManager.getRecords().size(), 2);
+ Assert.assertTrue(indexFileManager.getRecords().get(1).getPath().endsWith(expectedFilePath));
+ }
+
+ @AfterClass
+ public void tearDown() {
+ FileUtils.deleteQuietly(new File(spoolDirTest));
+ }
+}
diff --git a/notification/src/test/resources/spool/archive/spool-1.json b/notification/src/test/resources/spool/archive/spool-1.json
new file mode 100644
index 0000000..bcbcecf
--- /dev/null
+++ b/notification/src/test/resources/spool/archive/spool-1.json
@@ -0,0 +1,3 @@
+message-0
+message-1
+message-2
\ No newline at end of file
diff --git a/notification/src/test/resources/spool/index-test-src-1.json b/notification/src/test/resources/spool/index-test-src-1.json
new file mode 100644
index 0000000..d4cee87
--- /dev/null
+++ b/notification/src/test/resources/spool/index-test-src-1.json
@@ -0,0 +1,2 @@
+{"id":"1","path":"0.log","line":0,"created":123,"writeCompleted":888,"doneCompleted":0,"lastSuccess":0,"failedAttempt":0,"status":"PENDING"}
+{"id":"2","path":"1.log","line":10,"created":456,"writeCompleted":0,"doneCompleted":0,"lastSuccess":0,"failedAttempt":0,"status":"WRITE_IN_PROGRESS"}
diff --git a/notification/src/test/resources/spool/index-test-src-1_closed.json b/notification/src/test/resources/spool/index-test-src-1_closed.json
new file mode 100644
index 0000000..1b8881e
--- /dev/null
+++ b/notification/src/test/resources/spool/index-test-src-1_closed.json
@@ -0,0 +1 @@
+{"id":"x","path":"x.log","line":10,"created":456,"writeCompleted":0,"doneCompleted":0,"lastSuccess":0,"failedAttempt":0,"status":"DONE"}
diff --git a/pom.xml b/pom.xml
index b924201..91fd593 100644
--- a/pom.xml
+++ b/pom.xml
@@ -697,6 +697,7 @@
<javax.servlet.version>3.1.0</javax.servlet.version>
<guava.version>25.1-jre</guava.version>
<antlr4.version>4.7</antlr4.version>
+ <log4j.version>2.8</log4j.version>
<!-- Needed for hooks -->
<aopalliance.version>1.0</aopalliance.version>