You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2020/10/22 21:08:19 UTC

[atlas] branch master updated: ATLAS-3427: Atlas Hook Enhancements for improved resiliancy.

This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new de87bc5  ATLAS-3427: Atlas Hook Enhancements for improved resiliancy.
de87bc5 is described below

commit de87bc5022627d82cb8e6048b6728e7028a4af25
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Thu Oct 22 13:45:47 2020 -0700

    ATLAS-3427: Atlas Hook Enhancements for improved resiliancy.
---
 .../java/org/apache/atlas/hive/hook/HiveHook.java  |   5 +
 .../atlas/hive/hook/HiveMetastoreHookImpl.java     |   2 +-
 notification/pom.xml                               |  12 +
 .../main/java/org/apache/atlas/hook/AtlasHook.java |  10 +
 .../apache/atlas/hook/FailedMessagesLogger.java    |  39 +-
 .../apache/atlas/kafka/NotificationProvider.java   |  48 +-
 .../atlas/notification/AbstractNotification.java   |   6 +-
 .../apache/atlas/notification/LogConfigUtils.java  | 108 +++++
 .../atlas/notification/NotificationException.java  |   4 +
 .../atlas/notification/NotificationInterface.java  |   8 +
 .../apache/atlas/notification/spool/Archiver.java  | 125 ++++++
 .../atlas/notification/spool/AtlasFileSpool.java   | 163 +++++++
 .../atlas/notification/spool/FileOperations.java   |  67 +++
 .../atlas/notification/spool/IndexManagement.java  | 487 +++++++++++++++++++++
 .../apache/atlas/notification/spool/Publisher.java | 210 +++++++++
 .../notification/spool/SpoolConfiguration.java     | 123 ++++++
 .../atlas/notification/spool/SpoolUtils.java       | 173 ++++++++
 .../apache/atlas/notification/spool/Spooler.java   | 127 ++++++
 .../notification/spool/models/IndexRecord.java     | 221 ++++++++++
 .../notification/spool/models/IndexRecords.java    |  89 ++++
 .../spool/utils/local/FileLockedReadWrite.java     |  73 +++
 .../utils/local/FileOpAppend.java}                 |  33 +-
 .../spool/utils/local/FileOpCompaction.java        |  56 +++
 .../spool/utils/local/FileOpDelete.java}           |  42 +-
 .../notification/spool/utils/local/FileOpRead.java |  66 +++
 .../spool/utils/local/FileOpUpdate.java            |  60 +++
 .../spool/utils/local/FileOperation.java           | 181 ++++++++
 .../notification/AbstractNotificationTest.java     |   2 +-
 .../notification/spool/AtlasFileSpoolTest.java     | 228 ++++++++++
 .../apache/atlas/notification/spool/BaseTest.java  |  78 ++++
 .../notification/spool/IndexManagementTest.java    | 189 ++++++++
 .../src/test/resources/spool/archive/spool-1.json  |   3 +
 .../src/test/resources/spool/index-test-src-1.json |   2 +
 .../resources/spool/index-test-src-1_closed.json   |   1 +
 pom.xml                                            |   1 +
 35 files changed, 2962 insertions(+), 80 deletions(-)

diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 6513234..e48967d 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -164,6 +164,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
     public HiveHook() {
     }
 
+    public HiveHook(String name) {
+        super(name);
+    }
+
+
     @Override
     public void run(HookContext hookContext) throws Exception {
         if (LOG.isDebugEnabled()) {
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java
index 3c0f0c1..f01419c 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java
@@ -45,7 +45,7 @@ public class HiveMetastoreHookImpl extends MetaStoreEventListener {
     public HiveMetastoreHookImpl(Configuration config) {
         super(config);
 
-        this.hiveHook = new HiveHook();
+        this.hiveHook = new HiveHook(this.getClass().getSimpleName());
         this.hook     = new HiveMetastoreHook();
     }
 
diff --git a/notification/pom.xml b/notification/pom.xml
index 8affd59..740e8e5 100644
--- a/notification/pom.xml
+++ b/notification/pom.xml
@@ -56,6 +56,18 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <version>${log4j.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <version>${log4j.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_${kafka.scala.binary.version}</artifactId>
         </dependency>
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index 8659126..26c2d8f 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -126,6 +126,7 @@ public abstract class AtlasHook {
                     try {
                         LOG.info("==> Shutdown of Atlas Hook");
 
+                        notificationInterface.close();
                         executor.shutdown();
                         executor.awaitTermination(SHUTDOWN_HOOK_WAIT_TIME_MS, TimeUnit.MILLISECONDS);
                         executor = null;
@@ -141,6 +142,15 @@ public abstract class AtlasHook {
         LOG.info("Created Atlas Hook");
     }
 
+    public AtlasHook() {
+        notificationInterface.init(this.getClass().getSimpleName(), failedMessagesLogger);
+    }
+
+    public AtlasHook(String name) {
+        LOG.info("AtlasHook: Spool name: Passed from caller.: {}", name);
+        notificationInterface.init(name, failedMessagesLogger);
+    }
+
     /**
      * Notify atlas of the entity through message. The entity can be a
      * complex entity with reference to other entities.
diff --git a/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java b/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java
index b319e81..5488c1c 100644
--- a/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java
+++ b/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java
@@ -19,16 +19,14 @@
 package org.apache.atlas.hook;
 
 
-import org.apache.log4j.Appender;
+import org.apache.atlas.notification.LogConfigUtils;
 import org.apache.log4j.DailyRollingFileAppender;
-import org.apache.log4j.FileAppender;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Enumeration;
 
 /**
  * A logger wrapper that can be used to write messages that failed to be sent to a log file.
@@ -46,7 +44,7 @@ public class FailedMessagesLogger {
     }
 
     void init() {
-        String rootLoggerDirectory = getRootLoggerDirectory();
+        String rootLoggerDirectory = LogConfigUtils.getRootDir();
         if (rootLoggerDirectory == null) {
             return;
         }
@@ -62,38 +60,7 @@ public class FailedMessagesLogger {
         }
     }
 
-    /**
-     * Get the root logger file location under which the failed log messages will be written.
-     *
-     * Since this class is used in Hooks which run within JVMs of other components like Hive,
-     * we want to write the failed messages file under the same location as where logs from
-     * the host component are saved. This method attempts to get such a location from the
-     * root logger's appenders. It will work only if at least one of the appenders is a {@link FileAppender}
-     *
-     * @return directory under which host component's logs are stored.
-     */
-    private String getRootLoggerDirectory() {
-        String      rootLoggerDirectory = null;
-        Logger      rootLogger          = Logger.getRootLogger();
-        Enumeration allAppenders        = rootLogger.getAllAppenders();
-
-        if (allAppenders != null) {
-            while (allAppenders.hasMoreElements()) {
-                Appender appender = (Appender) allAppenders.nextElement();
-
-                if (appender instanceof FileAppender) {
-                    FileAppender fileAppender   = (FileAppender) appender;
-                    String       rootLoggerFile = fileAppender.getFile();
-
-                    rootLoggerDirectory = rootLoggerFile != null ? new File(rootLoggerFile).getParent() : null;
-                    break;
-                }
-            }
-        }
-        return rootLoggerDirectory;
-    }
-
-    void log(String message) {
+    public void log(String message) {
         logger.error(message);
     }
 }
diff --git a/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java b/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java
index 2dd970e..b35af97 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java
@@ -19,23 +19,59 @@ package org.apache.atlas.kafka;
 
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.notification.LogConfigUtils;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.notification.spool.AtlasFileSpool;
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
 
 /**
  * Provider class for Notification interfaces
  */
 public class NotificationProvider {
-    private static KafkaNotification kafka;
+    private static final Logger LOG = LoggerFactory.getLogger(NotificationProvider.class);
+
+    private static final String CONF_ATLAS_HOOK_SPOOL_ENABLED    = "atlas.hook.spool.enabled";
+    private static final String CONF_ATLAS_HOOK_SPOOL_DIR        = "atlas.hook.spool.dir";
+
+    private static final boolean CONF_ATLAS_HOOK_SPOOL_ENABLED_DEFAULT = false;
 
-    public static KafkaNotification get() {
-        if (kafka == null) {
+    private static NotificationInterface notificationProvider;
+
+    public static NotificationInterface get() {
+        if (notificationProvider == null) {
             try {
-                Configuration applicationProperties = ApplicationProperties.get();
-                kafka = new KafkaNotification(applicationProperties);
+                Configuration     conf     = ApplicationProperties.get();
+                KafkaNotification kafka    = new KafkaNotification(conf);
+                String            spoolDir = getSpoolDir(conf);
+
+                if (isSpoolingEnabled(conf) && StringUtils.isNotEmpty(spoolDir)) {
+                    LOG.info("Notification spooling is enabled: spool directory={}", spoolDir);
+
+                    conf.setProperty(CONF_ATLAS_HOOK_SPOOL_DIR, spoolDir);
+
+                    notificationProvider = new AtlasFileSpool(conf, kafka);
+                } else {
+                    LOG.info("Notification spooling is not enabled");
+
+                    notificationProvider = kafka;
+                }
             } catch (AtlasException e) {
                 throw new RuntimeException(e);
             }
         }
-        return kafka;
+        return notificationProvider;
+    }
+
+    private static boolean isSpoolingEnabled(Configuration configuration) {
+        return configuration.getBoolean(CONF_ATLAS_HOOK_SPOOL_ENABLED, CONF_ATLAS_HOOK_SPOOL_ENABLED_DEFAULT);
+    }
+
+    private static String getSpoolDir(Configuration configuration) {
+        return configuration.getString(CONF_ATLAS_HOOK_SPOOL_DIR);
     }
 }
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
index 45a66bf..c45a1da 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
@@ -76,6 +76,10 @@ public abstract class AbstractNotification implements NotificationInterface {
     protected AbstractNotification() {
     }
 
+    @Override
+    public void init(String source, Object failedMessagesLogger) {
+    }
+
     // ----- NotificationInterface -------------------------------------------
 
     @Override
@@ -108,7 +112,7 @@ public abstract class AbstractNotification implements NotificationInterface {
      *
      * @throws NotificationException if an error occurs while sending
      */
-    protected abstract void sendInternal(NotificationType type, List<String> messages) throws NotificationException;
+    public abstract void sendInternal(NotificationType type, List<String> messages) throws NotificationException;
 
 
     // ----- utility methods -------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/LogConfigUtils.java b/notification/src/main/java/org/apache/atlas/notification/LogConfigUtils.java
new file mode 100644
index 0000000..dc98592
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/LogConfigUtils.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.FileAppender;
+import org.apache.logging.log4j.core.appender.RollingFileAppender;
+import org.apache.logging.log4j.core.appender.RollingRandomAccessFileAppender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Enumeration;
+
+public class LogConfigUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(LogConfigUtils.class);
+
+    public static String getRootDir() {
+        String ret = getFileAppenderPath();
+
+        if (StringUtils.isEmpty(ret)) {
+            ret = getFileAppenderPathApproach2();
+        }
+
+        if (StringUtils.isNotEmpty(ret)) {
+            ret = StringUtils.substringBeforeLast(ret, File.separator);
+        } else {
+            ret = null;
+        }
+
+        LOG.info("getRootDir(): ret={}", ret);
+
+        return ret;
+    }
+
+    private static String getFileAppenderPath() {
+        String        ret           = StringUtils.EMPTY;
+        LoggerContext loggerContext = (LoggerContext) LogManager.getContext();
+        Configuration configuration = loggerContext.getConfiguration();
+
+        for (Appender appender : configuration.getAppenders().values()) {
+            if (appender instanceof RollingRandomAccessFileAppender) {
+                ret = ((RollingRandomAccessFileAppender) appender).getFileName();
+                break;
+            } else if (appender instanceof RollingFileAppender) {
+                ret =  ((RollingRandomAccessFileAppender) appender).getFileName();
+                break;
+            } else if (appender instanceof FileAppender) {
+                ret =  ((FileAppender) appender).getFileName();
+                break;
+            } else {
+                LOG.info("Could not infer log path from this appender: {}", appender.getClass().getName());
+            }
+        }
+
+        LOG.info("getFileAppenderPath(): ret={}", ret);
+
+        return ret;
+    }
+
+    private static String getFileAppenderPathApproach2() {
+        String ret = null;
+
+        try {
+            org.apache.log4j.Logger rootLogger   = org.apache.log4j.Logger.getRootLogger();
+            Enumeration             allAppenders = rootLogger.getAllAppenders();
+
+            if (allAppenders != null) {
+                while (allAppenders.hasMoreElements()) {
+                    Object appender = allAppenders.nextElement();
+
+                    if (appender instanceof org.apache.log4j.FileAppender) {
+                        org.apache.log4j.FileAppender fileAppender = (org.apache.log4j.FileAppender) appender;
+
+                        ret = fileAppender.getName();
+
+                        break;
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("getFileAppenderPathApproach2(): failed to get appender path", e);
+        }
+
+        LOG.info("getFileAppenderPathApproach2(): ret={}", ret);
+
+        return ret;
+    }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationException.java b/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
index 2dd9c9f..353d650 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
@@ -31,6 +31,10 @@ public class NotificationException extends AtlasException {
         super(e);
     }
 
+    public NotificationException(Exception e, String errorMsg) {
+        super(errorMsg, e);
+    }
+
     public NotificationException(Exception e, List<String> failedMessages) {
         super(e);
         this.failedMessages = failedMessages;
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
index 6caf7e2..edd8ed9 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -61,6 +61,14 @@ public interface NotificationInterface {
 
     /**
      *
+     * @param source: Name of the source
+     * @param failedMessagesLogger: Logger for failed messages
+     * @return
+     */
+    void init(String source, Object failedMessagesLogger);
+
+    /**
+     *
      * @param user Name of the user under which the processes is running
      */
     void setCurrentUser(String user);
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/Archiver.java b/notification/src/main/java/org/apache/atlas/notification/spool/Archiver.java
new file mode 100644
index 0000000..9e12d26
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/Archiver.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool;
+
+import org.apache.atlas.notification.spool.models.IndexRecord;
+import org.apache.atlas.type.AtlasType;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+
+public class Archiver {
+    private static final Logger LOG = LoggerFactory.getLogger(Archiver.class);
+
+    private final String source;
+    private final File   indexDoneFile;
+    private final File   archiveFolder;
+    private final int    maxArchiveFiles;
+
+    public Archiver(String source, File indexDoneFile, File archiveFolder, int maxArchiveFiles) {
+        this.source          = source;
+        this.indexDoneFile   = indexDoneFile;
+        this.archiveFolder   = archiveFolder;
+        this.maxArchiveFiles = maxArchiveFiles;
+    }
+
+    public void archive(IndexRecord indexRecord) {
+        moveToArchiveDir(indexRecord);
+
+        removeOldFiles();
+    }
+
+    private void moveToArchiveDir(IndexRecord indexRecord) {
+        File spoolFile = null;
+        File archiveFile = null;
+
+        try {
+            spoolFile   = new File(indexRecord.getPath());
+            archiveFile = new File(archiveFolder, spoolFile.getName());
+
+            LOG.info("{}: moving spoolFile={} to archiveFile={}", source, spoolFile, archiveFile);
+
+            FileUtils.moveFile(spoolFile, archiveFile);
+        } catch (FileNotFoundException excp) {
+            LOG.warn("{}: failed while moving spoolFile={} to archiveFile={}", source, spoolFile, archiveFile, excp);
+        } catch (IOException excp) {
+            LOG.error("{}: failed while moving spoolFile={} to archiveFile={}", source, spoolFile, archiveFile, excp);
+        }
+    }
+
+    private void removeOldFiles() {
+        try {
+            File[] logFiles      = archiveFolder == null ? null : archiveFolder.listFiles(pathname -> StringUtils.endsWithIgnoreCase(pathname.getName(), SpoolUtils.FILE_EXT_LOG));
+            int    filesToDelete = logFiles == null ? 0 : logFiles.length - maxArchiveFiles;
+
+            if (filesToDelete > 0) {
+                try (BufferedReader br = new BufferedReader(new FileReader(indexDoneFile))) {
+                    int filesDeletedCount = 0;
+
+                    for (String line = br.readLine(); line != null; line = br.readLine()) {
+                        line = line.trim();
+
+                        if (StringUtils.isEmpty(line)) {
+                            continue;
+                        }
+
+                        try {
+                            IndexRecord record      = AtlasType.fromJson(line, IndexRecord.class);
+                            File        logFile     = new File(record.getPath());
+                            String      fileName    = logFile.getName();
+                            File        archiveFile = new File(archiveFolder, fileName);
+
+                            if (!archiveFile.exists()) {
+                                LOG.warn("archive file does not exist: {}", archiveFile);
+
+                                continue;
+                            }
+
+                            LOG.info("Deleting archive file: {}", archiveFile);
+
+                            boolean ret = archiveFile.delete();
+
+                            if (!ret) {
+                                LOG.error("{}: Error deleting archive file. File: {}", source, archiveFile);
+                            } else {
+                                filesDeletedCount++;
+                            }
+
+                            if (filesDeletedCount >= filesToDelete) {
+                                break;
+                            }
+                        } catch (Exception excp) {
+                            LOG.error("{}: Error deleting older archive file in index-record: {}", source, line, excp);
+                        }
+                    }
+
+                    LOG.info("{}: Deleted: {} archived files", source, filesDeletedCount);
+                }
+            }
+        } catch(Exception exception){
+            LOG.error("{}: Error deleting older files from archive folder. Folder: {}", source, archiveFolder, exception);
+        }
+    }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
new file mode 100644
index 0000000..2d7d195
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.hook.FailedMessagesLogger;
+import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationException;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class AtlasFileSpool implements NotificationInterface {
+    private static final Logger LOG = LoggerFactory.getLogger(AtlasFileSpool.class);
+
+    private final AbstractNotification notificationHandler;
+    private final SpoolConfiguration   config;
+    private final IndexManagement      indexManagement;
+    private final Spooler              spooler;
+    private final Publisher            publisher;
+    private       Thread               publisherThread;
+    private       Boolean              initDone = null;
+
+    public AtlasFileSpool(Configuration configuration, AbstractNotification notificationHandler) {
+        this.notificationHandler = notificationHandler;
+        this.config              = new SpoolConfiguration(configuration, notificationHandler.getClass().getSimpleName());
+        this.indexManagement     = new IndexManagement(config);
+        this.spooler             = new Spooler(config, indexManagement);
+        this.publisher           = new Publisher(config, indexManagement, notificationHandler);
+    }
+
+    @Override
+    public void init(String source, Object failedMessagesLogger) {
+        LOG.info("==> AtlasFileSpool.init(source={})", source);
+
+        if (!isInitDone()) {
+            try {
+                config.setSource(source);
+
+                LOG.info("{}: Initialization: Starting...", this.config.getSourceName());
+
+                indexManagement.init();
+
+                if (failedMessagesLogger instanceof FailedMessagesLogger) {
+                    this.spooler.setFailedMessagesLogger((FailedMessagesLogger) failedMessagesLogger);
+                }
+
+                startPublisher();
+
+                initDone = true;
+            } catch (AtlasException exception) {
+                LOG.error("AtlasFileSpool(source={}): initialization failed", this.config.getSourceName(), exception);
+
+                initDone = false;
+            } catch (Throwable t) {
+                LOG.error("AtlasFileSpool(source={}): initialization failed, unknown error", this.config.getSourceName(), t);
+            }
+        } else {
+            LOG.info("AtlasFileSpool.init(): initialization already done. initDone={}", initDone);
+        }
+
+        LOG.info("<== AtlasFileSpool.init(source={})", source);
+    }
+
+    @Override
+    public void setCurrentUser(String user) {
+        this.notificationHandler.setCurrentUser(user);
+    }
+
+    @Override
+    public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) {
+        LOG.warn("AtlasFileSpool.createConsumers(): not implemented");
+
+        return null;
+    }
+
+    @Override
+    public <T> void send(NotificationType type, T... messages) throws NotificationException {
+        send(type, Arrays.asList(messages));
+    }
+
+    @Override
+    public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
+        if (hasInitSucceeded() && (this.indexManagement.isPending() || this.publisher.isDestinationDown())) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("AtlasFileSpool.send(): sending to spooler");
+            }
+
+            spooler.send(type, messages);
+        } else {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("AtlasFileSpool.send(): sending to notificationHandler");
+            }
+
+            try {
+                notificationHandler.send(type, messages);
+            } catch (Exception e) {
+                if (isInitDone()) {
+                    LOG.info("AtlasFileSpool.send(): failed in sending to notificationHandler. Sending to spool", e);
+
+                    publisher.setDestinationDown();
+
+                    spooler.send(type, messages);
+                } else {
+                    LOG.warn("AtlasFileSpool.send(): failed in sending to notificationHandler. Not sending to spool, as it is not yet initialized", e);
+
+                    throw e;
+                }
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            spooler.setDrain();
+            publisher.setDrain();
+            indexManagement.stop();
+
+            publisherThread.join();
+        } catch (InterruptedException e) {
+            LOG.error("Interrupted! source={}", this.config.getSourceName(), e);
+        }
+    }
+
+    private void startPublisher() {
+        publisherThread = new Thread(publisher);
+
+        publisherThread.setDaemon(true);
+        publisherThread.setContextClassLoader(this.getClass().getClassLoader());
+        publisherThread.start();
+
+        LOG.info("{}: publisher started!", this.config.getSourceName());
+    }
+
+    private boolean isInitDone() {
+        return this.initDone != null;
+    }
+
+    private boolean hasInitSucceeded() {
+        return this.initDone != null && this.initDone == true;
+    }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/FileOperations.java b/notification/src/main/java/org/apache/atlas/notification/spool/FileOperations.java
new file mode 100644
index 0000000..538ea49
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/FileOperations.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool;
+
+import org.apache.atlas.notification.spool.utils.local.FileOpAppend;
+import org.apache.atlas.notification.spool.utils.local.FileOpCompaction;
+import org.apache.atlas.notification.spool.utils.local.FileOpDelete;
+import org.apache.atlas.notification.spool.utils.local.FileOpRead;
+import org.apache.atlas.notification.spool.utils.local.FileOpUpdate;
+
+import java.io.File;
+
+public class FileOperations {
+    private final String           emptyRecordJson;
+    private final FileOpAppend     fileOpAppend;
+    private final FileOpRead       fileOpLoad;
+    private final FileOpUpdate     fileOpUpdate;
+    private final FileOpCompaction fileOpCompaction;
+    private final FileOpDelete     fileOpDelete;
+
+    public FileOperations(String emptyRecordJson, String source) {
+        this.emptyRecordJson  = emptyRecordJson;
+        this.fileOpAppend     = new FileOpAppend(source);
+        this.fileOpLoad       = new FileOpRead(source);
+        this.fileOpUpdate     = new FileOpUpdate(source, fileOpAppend);
+        this.fileOpCompaction = new FileOpCompaction(source);
+        this.fileOpDelete     = new FileOpDelete(source);
+    }
+
+    public String[] load(File file) {
+        fileOpLoad.perform(file);
+
+        return fileOpLoad.getItems();
+    }
+
+    public void delete(File file, String id) {
+        fileOpDelete.perform(file, id, emptyRecordJson);
+    }
+
+    public void append(File file, String json) {
+        fileOpAppend.perform(file, json);
+    }
+
+    public void compact(File file) {
+        fileOpCompaction.perform(file);
+    }
+
+    public void update(File file, String id, String json) {
+        fileOpUpdate.setId(id);
+        fileOpUpdate.perform(file, json);
+    }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
new file mode 100644
index 0000000..b3a586b
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java
@@ -0,0 +1,487 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.notification.spool.models.IndexRecord;
+import org.apache.atlas.notification.spool.models.IndexRecords;
+import org.apache.atlas.notification.spool.utils.local.FileLockedReadWrite;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutput;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.channels.OverlappingFileLockException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class IndexManagement {
+    private static final Logger LOG = LoggerFactory.getLogger(IndexManagement.class);
+
+    private static final int MAX_RETRY_ATTEMPTS = 3;
+
+    private final SpoolConfiguration config;
+    private       IndexFileManager   indexFileManager;
+    private       IndexReader        indexReader;
+    private       IndexWriter        indexWriter;
+
+    public IndexManagement(SpoolConfiguration config) {
+        this.config = config;
+    }
+
+    public void init() throws IOException, AtlasException {
+        String sourceName = config.getSourceName();
+
+        File spoolDir = SpoolUtils.getCreateDirectory(config.getSpoolDir());
+
+        if (spoolDir == null) {
+            throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, spoolDir.getAbsolutePath()));
+        }
+
+        File archiveDir = SpoolUtils.getCreateDirectory(config.getArchiveDir());
+
+        if (archiveDir == null) {
+            throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, archiveDir.getAbsolutePath()));
+        }
+
+        File indexFile = SpoolUtils.getCreateFile(config.getIndexFile(), sourceName);
+
+        if (indexFile == null) {
+            throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, indexFile.getAbsolutePath()));
+        }
+
+        File indexDoneFile = SpoolUtils.getCreateFile(config.getIndexDoneFile(), sourceName);
+
+        if (indexDoneFile == null) {
+            throw new AtlasException(String.format("%s: %s not found or inaccessible!", sourceName, indexDoneFile.getAbsolutePath()));
+        }
+
+        performInit(indexFile.getAbsolutePath(), sourceName);
+    }
+
+    @VisibleForTesting
+    void performInit(String indexFilePath, String source) {
+        try {
+            File spoolDir      = config.getSpoolDir();
+            File archiveDir    = config.getArchiveDir();
+            File indexFile     = config.getIndexFile();
+            File indexDoneFile = config.getIndexDoneFile();
+
+            indexFileManager = new IndexFileManager(source, indexFile, indexDoneFile, archiveDir, config.getMaxArchiveFiles());
+            indexReader      = new IndexReader(source, indexFileManager, config.getRetryDestinationMS());
+            indexWriter      = new IndexWriter(source, config, indexFileManager, indexReader, spoolDir, archiveDir, config.getFileRolloverSec());
+        } catch (Exception e) {
+            LOG.error("{}: init: Failed! Error loading records from index file: {}", config.getSourceName(), indexFilePath);
+        }
+    }
+
+    public boolean isPending() {
+        return !indexReader.isEmpty() ||
+                (indexWriter.getCurrent() != null && indexWriter.getCurrent().getLine() > 0);
+    }
+
+    public synchronized DataOutput getSpoolWriter() throws IOException {
+        return indexWriter.getCreateWriter();
+    }
+
+    public void setSpoolWriteInProgress() {
+        this.indexWriter.setFileWriteInProgress(true);
+    }
+
+    public void resetSpoolWriteInProgress() {
+        this.indexWriter.setFileWriteInProgress(false);
+    }
+
+    public void updateFailedAttempt() {
+        this.indexReader.updateFailedAttempt();
+    }
+
+    public IndexRecord next() throws InterruptedException {
+        return indexReader.next();
+    }
+
+    public int getQueueSize() {
+        return indexReader.size();
+    }
+
+    public void removeAsDone(IndexRecord indexRecord) {
+        this.indexReader.removeAsDone(indexRecord);
+        this.indexWriter.rolloverIfNeeded();
+    }
+
+    public void stop() {
+        indexWriter.stop();
+    }
+
+    public void rolloverSpoolFileIfNeeded() {
+        this.indexWriter.rolloverIfNeeded();
+    }
+
+    @VisibleForTesting
+    IndexFileManager getIndexFileManager() {
+        return this.indexFileManager;
+    }
+
+    public void update(IndexRecord record) {
+        this.indexFileManager.updateIndex(record);
+    }
+
+    public void flushSpoolWriter() throws IOException {
+        this.indexWriter.flushCurrent();
+    }
+
+    static class IndexWriter {
+        private final String              source;
+        private final SpoolConfiguration  config;
+        private final File                spoolFolder;
+        private final File                archiveFolder;
+        private final int                 rollOverTimeout;
+        private final IndexFileManager    indexFileManager;
+        private final IndexReader         indexReader;
+        private final FileLockedReadWrite fileLockedReadWrite;
+        private       IndexRecord         currentIndexRecord;
+        private       DataOutput          currentWriter;
+        private       boolean             fileWriteInProgress;
+
+
+        public IndexWriter(String source, SpoolConfiguration config, IndexFileManager indexFileManager,
+                           IndexReader indexReader,
+                           File spoolFolder, File archiveFolder, int rollOverTimeout) {
+            this.source              = source;
+            this.config              = config;
+            this.indexFileManager    = indexFileManager;
+            this.indexReader         = indexReader;
+            this.spoolFolder         = spoolFolder;
+            this.archiveFolder       = archiveFolder;
+            this.rollOverTimeout     = rollOverTimeout;
+            this.fileLockedReadWrite = new FileLockedReadWrite(source);
+
+            setCurrent(indexFileManager.getFirstWriteInProgressRecord());
+        }
+
+        public void setCurrent(IndexRecord indexRecord) {
+            this.currentIndexRecord = indexRecord;
+        }
+
+        public IndexRecord getCurrent() {
+            return this.currentIndexRecord;
+        }
+
+        private void setCurrentWriter(File file) throws IOException {
+            this.currentWriter = fileLockedReadWrite.getOutput(file);
+        }
+
+        public synchronized DataOutput getWriter() {
+            return this.currentWriter;
+        }
+
+        public synchronized DataOutput getCreateWriter() throws IOException {
+            rolloverIfNeeded();
+
+            if (getCurrent() == null) {
+                IndexRecord record   = new IndexRecord(StringUtils.EMPTY);
+                String      filePath = SpoolUtils.getSpoolFilePath(config, spoolFolder.toString(), archiveFolder.toString(), record.getId());
+
+                record.setPath(filePath);
+
+                indexFileManager.appendToIndexFile(record);
+
+                setCurrent(record);
+
+                LOG.info("IndexWriter.getCreateWriter(source={}): Creating new spool file. File: {}", this.source, filePath);
+
+                setCurrentWriter(new File(filePath));
+            } else {
+                if (this.currentWriter == null) {
+                    LOG.info("IndexWriter.getCreateWriter(source={}): Opening existing file for append: File: {}", this.source, currentIndexRecord.getPath());
+
+                    setCurrentWriter(new File(currentIndexRecord.getPath()));
+                }
+            }
+
+            return currentWriter;
+        }
+
+        public synchronized void rolloverIfNeeded() {
+            if (currentWriter != null && shouldRolloverSpoolFile()) {
+                LOG.info("IndexWriter.rolloverIfNeeded(source={}): Rolling over. Closing File: {}", this.config.getSourceName(), currentIndexRecord.getPath());
+
+                fileLockedReadWrite.close();
+
+                currentWriter = null;
+
+                currentIndexRecord.setStatusPending();
+
+                indexFileManager.updateIndex(currentIndexRecord);
+
+                LOG.info("IndexWriter.rolloverIfNeeded(source={}): Adding file to queue. File: {}", this.config.getSourceName(), currentIndexRecord.getPath());
+
+                indexReader.addToPublishQueue(currentIndexRecord);
+
+                currentIndexRecord = null;
+            }
+        }
+
+        private boolean shouldRolloverSpoolFile() {
+            return currentIndexRecord != null &&
+                    (System.currentTimeMillis() - currentIndexRecord.getCreated() > this.rollOverTimeout);
+        }
+
+        void flushCurrent() throws IOException {
+            DataOutput pw = getWriter();
+
+            if (pw != null) {
+                fileLockedReadWrite.flush();
+            }
+        }
+
+        public void setFileWriteInProgress(boolean val) {
+            this.fileWriteInProgress = val;
+        }
+
+        public boolean isWriteInProgress() {
+            return this.fileWriteInProgress;
+        }
+
+        public void stop() {
+            LOG.info("==> IndexWriter.stop(source={})", this.config.getSourceName());
+
+            try {
+                DataOutput out = getWriter();
+
+                if (out != null) {
+                    flushCurrent();
+
+                    for (int i = 0; i < MAX_RETRY_ATTEMPTS; i++) {
+                        if (isWriteInProgress()) {
+                            try {
+                                TimeUnit.SECONDS.sleep(i);
+                            } catch (InterruptedException e) {
+                                LOG.error("IndexWriter.stop(source={}): Interrupted!", this.config.getSourceName(), e);
+
+                                break;
+                            }
+
+                            continue;
+                        }
+
+                        LOG.info("IndexWriter.stop(source={}): Closing open file.", this.config.getSourceName());
+
+                        fileLockedReadWrite.close();
+                        currentIndexRecord.setStatusPending();
+                        indexFileManager.updateIndex(currentIndexRecord);
+
+                        break;
+                    }
+                }
+            } catch (FileNotFoundException e) {
+                LOG.error("IndexWriter.stop(source={}): File not found! {}", this.config.getSourceName(), getCurrent().getPath(), e);
+            } catch (IOException exception) {
+                LOG.error("IndexWriter.stop(source={}): Error accessing file: {}", this.config.getSourceName(), getCurrent().getPath(), exception);
+            } catch (Exception exception) {
+                LOG.error("IndexWriter.stop(source={}): Error closing spool file.", this.config.getSourceName(), exception);
+            }
+
+            LOG.info("<== IndexWriter.stop(source={})", this.config.getSourceName());
+        }
+    }
+
+    static class IndexReader {
+        private final String                     source;
+        private final BlockingQueue<IndexRecord> blockingQueue;
+        private final IndexFileManager           indexFileManager;
+        private final long                       retryDestinationMS;
+        private       IndexRecord                currentIndexRecord;
+
+        public IndexReader(String source, IndexFileManager indexFileManager, long retryDestinationMS) {
+            this.source             = source;
+            this.blockingQueue      = new LinkedBlockingQueue<>();
+            this.retryDestinationMS = retryDestinationMS;
+            this.indexFileManager   = indexFileManager;
+
+            List<IndexRecord> records = indexFileManager.getRecords();
+
+            records.stream().forEach(x -> addIfStatus(x, IndexRecord.STATUS_READ_IN_PROGRESS));
+            records.stream().forEach(x -> addIfStatus(x, IndexRecord.STATUS_PENDING));
+        }
+
+        private void addIfStatus(IndexRecord record, String status) {
+            if (record != null && record.getStatus().equals(status)) {
+                if (!SpoolUtils.fileExists(record)) {
+                    LOG.error("IndexReader.addIfStatus(source={}): file {} not found!", this.source, record.getPath());
+                } else {
+                    addToPublishQueue(record);
+                }
+            }
+        }
+
+        public void addToPublishQueue(IndexRecord record) {
+            try {
+                if (!blockingQueue.contains(record)) {
+                    blockingQueue.add(record);
+                }
+            } catch (OverlappingFileLockException lockException) {
+                LOG.warn("{}: {}: Someone else has locked the file.", source, record.getPath());
+            }
+        }
+
+        public IndexRecord next() throws InterruptedException {
+            this.currentIndexRecord = blockingQueue.poll(retryDestinationMS, TimeUnit.MILLISECONDS);
+
+            return this.currentIndexRecord;
+        }
+
+        public int size() {
+            return blockingQueue.size();
+        }
+
+        public boolean isEmpty() {
+            return blockingQueue.isEmpty();
+        }
+
+        public void updateFailedAttempt() {
+            if (currentIndexRecord != null) {
+                currentIndexRecord.updateFailedAttempt();
+
+                indexFileManager.updateIndex(currentIndexRecord);
+            }
+        }
+
+        public void removeAsDone(IndexRecord indexRecord) {
+            indexRecord.setDone();
+
+            indexFileManager.remove(indexRecord);
+        }
+    }
+
+    static class IndexFileManager {
+        private final String         source;
+        private final File           indexDoneFile;
+        private final File           indexFile;
+        private final Archiver       archiver;
+        private final FileOperations fileOperations;
+
+        public IndexFileManager(String source, File indexFile, File indexDoneFile, File archiveFolder, int maxArchiveFiles) {
+            this.source         = source;
+            this.indexFile      = indexFile;
+            this.indexDoneFile  = indexDoneFile;
+            this.archiver       = new Archiver(source, indexDoneFile, archiveFolder, maxArchiveFiles);
+            this.fileOperations = new FileOperations(SpoolUtils.getEmptyRecordForWriting(), source);
+        }
+
+        public List<IndexRecord> getRecords() {
+            return new ArrayList<>(loadRecords(indexFile).getRecords().values());
+        }
+
+        public synchronized void delete(File file, String id) {
+            fileOperations.delete(file, id);
+        }
+
+        public synchronized IndexRecord getFirstWriteInProgressRecord() {
+            IndexRecord  ret     = null;
+            IndexRecords records = loadRecords(indexFile);
+
+            if (records != null) {
+                for (IndexRecord record : records.getRecords().values()) {
+                    if (record.isStatusWriteInProgress()) {
+                        LOG.info("IndexFileManager.getFirstWriteInProgressRecord(source={}): current file={}", this.source, record.getPath());
+
+                        ret = record;
+
+                        break;
+                    }
+                }
+            }
+
+            return ret;
+        }
+
+        public synchronized void remove(IndexRecord record) {
+            delete(indexFile, record.getId());
+
+            appendToDoneFile(record);
+
+            IndexRecords records = loadRecords(indexFile);
+
+            if (records.size() == 0) {
+                LOG.info("IndexFileManager.remove(source={}): All done!", this.source);
+
+                compactFile(indexFile);
+            }
+        }
+
+        public void appendToIndexFile(IndexRecord record) {
+            fileOperations.append(indexFile, SpoolUtils.getRecordForWriting(record));
+        }
+
+        public void updateIndex(IndexRecord record) {
+            fileOperations.update(indexFile, record.getId(), SpoolUtils.getRecordForWriting(record));
+        }
+
+        private void compactFile(File file) {
+            LOG.info("IndexFileManager.compactFile(source={}): compacting file {}", source, file.getAbsolutePath());
+
+            try {
+                fileOperations.compact(file);
+            } finally {
+                LOG.info("IndexFileManager.compactFile(source={}): done compacting file {}", source, file.getAbsolutePath());
+            }
+        }
+
+        private void appendToDoneFile(IndexRecord indexRecord) {
+            String json = SpoolUtils.getRecordForWriting(indexRecord);
+
+            fileOperations.append(indexDoneFile, json);
+
+            archiver.archive(indexRecord);
+        }
+
+        @VisibleForTesting
+        IndexRecords loadRecords(File file) {
+            String[] items = fileOperations.load(file);
+
+            return SpoolUtils.createRecords(items);
+        }
+
+        @VisibleForTesting
+        File getDoneFile() {
+            return this.indexDoneFile;
+        }
+
+        @VisibleForTesting
+        File getIndexFile() {
+            return this.indexFile;
+        }
+
+        @VisibleForTesting
+        IndexRecord add(String path) {
+            IndexRecord record = new IndexRecord(path);
+
+            appendToIndexFile(record);
+
+            return record;
+        }
+    }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java b/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java
new file mode 100644
index 0000000..2947a21
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.notification.NotificationException;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.notification.spool.models.IndexRecord;
+import org.apache.atlas.notification.spool.utils.local.FileLockedReadWrite;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.channels.OverlappingFileLockException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class Publisher implements Runnable {
+    private static final Logger LOG = LoggerFactory.getLogger(Publisher.class);
+
+    private final SpoolConfiguration   configuration;
+    private final IndexManagement      indexManagement;
+    private final AbstractNotification notificationHandler;
+    private final String               notificationHandlerName;
+    private final int                  retryDestinationMS;
+    private final int                  messageBatchSize;
+    private       String               source;
+    private       boolean              isDrain;
+    private       boolean              isDestDown;
+
+    public Publisher(SpoolConfiguration configuration, IndexManagement indexManagement, AbstractNotification notificationHandler) {
+        this.configuration           = configuration;
+        this.indexManagement         = indexManagement;
+        this.notificationHandler     = notificationHandler;
+        this.notificationHandlerName = notificationHandler.getClass().getSimpleName();
+        this.retryDestinationMS      = configuration.getRetryDestinationMS();
+        this.messageBatchSize        = configuration.getMessageBatchSize();
+    }
+
+    public void run() {
+        this.source = configuration.getSourceName();
+
+        LOG.info("Publisher.run(source={}): starting publisher {}", this.source, notificationHandlerName);
+
+        try {
+            IndexRecord record = null;
+
+            while (true) {
+                waitIfDestinationDown();
+
+                if (this.isDrain) {
+                    break;
+                }
+
+                record = fetchNext(record);
+
+                if (record != null && processAndDispatch(record)) {
+                    indexManagement.removeAsDone(record);
+
+                    record = null;
+                } else {
+                    indexManagement.rolloverSpoolFileIfNeeded();
+                }
+            }
+        } catch (InterruptedException e) {
+            LOG.error("Publisher.run(source={}): {}: Publisher: Shutdown might be in progress!", this.source, notificationHandlerName);
+        } catch (Exception e) {
+            LOG.error("Publisher.run(source={}): {}: Publisher: Exception in destination writing!", this.source, notificationHandlerName, e);
+        }
+
+        LOG.info("Publisher.run(source={}): publisher {} exited!", this.source, notificationHandlerName);
+    }
+
+    public void setDestinationDown() {
+        this.isDestDown = true;
+
+        this.indexManagement.updateFailedAttempt();
+    }
+
+    public void setDrain() {
+        this.isDrain = true;
+    }
+
+    public boolean isDestinationDown() {
+        return isDestDown;
+    }
+
+    private void waitIfDestinationDown() throws InterruptedException {
+        if (isDestDown) {
+            LOG.info("Publisher.waitIfDestinationDown(source={}): {}: Destination is down. Sleeping for: {} ms. Queue: {} items",
+                     this.source, notificationHandlerName, retryDestinationMS, indexManagement.getQueueSize());
+
+            Thread.sleep(retryDestinationMS);
+        }
+
+    }
+
+    private IndexRecord fetchNext(IndexRecord record) {
+        if (record == null) {
+            try {
+                record = indexManagement.next();
+            } catch (Exception e) {
+                LOG.error("Publisher.fetchNext(source={}): failed!. publisher={}", this.source, notificationHandlerName, e);
+            }
+        }
+
+        return record;
+    }
+
+    @VisibleForTesting
+    boolean processAndDispatch(IndexRecord record) throws IOException {
+        boolean ret = true;
+
+        if (SpoolUtils.fileExists(record)) {
+            FileLockedReadWrite fileLockedRead = new FileLockedReadWrite(source);
+
+            try {
+                DataInput dataInput = fileLockedRead.getInput(new File(record.getPath()));
+                int lineInSpoolFile = 0;
+                List<String> messages = new ArrayList<>();
+
+                for (String message = dataInput.readLine(); message != null; message = dataInput.readLine()) {
+                    lineInSpoolFile++;
+
+                    if (lineInSpoolFile < record.getLine()) {
+                        continue;
+                    }
+
+                    messages.add(message);
+
+                    if (messages.size() == messageBatchSize) {
+                        dispatch(record, lineInSpoolFile, messages);
+                    }
+                }
+
+                dispatch(record, lineInSpoolFile, messages);
+
+                LOG.info("Publisher.processAndDispatch(source={}): consumer={}: done reading file {}", this.source, notificationHandlerName, record.getPath());
+
+                ret = true;
+            } catch (OverlappingFileLockException ex) {
+                LOG.error("Publisher.processAndDispatch(source={}): consumer={}: some other process has locked this file {}", this.source, notificationHandlerName, record.getPath(), ex);
+                ret = false;
+            } catch (FileNotFoundException ex) {
+                LOG.error("Publisher.processAndDispatch(source={}): consumer={}: file not found {}", this.source, notificationHandlerName, record.getPath(), ex);
+                ret = true;
+            } catch (Exception ex) {
+                LOG.error("Publisher.processAndDispatch(source={}): consumer={}: failed for file {}", this.source, notificationHandlerName, record.getPath(), ex);
+                ret = false;
+            } finally {
+                fileLockedRead.close();
+            }
+        } else {
+            LOG.error("Publisher.processAndDispatch(source={}): publisher={}: file '{}' not found!", this.source, notificationHandlerName, record.getPath());
+
+            ret = true;
+        }
+
+        return ret;
+    }
+
+    private void dispatch(IndexRecord record, int lineInSpoolFile, List<String> messages) throws Exception {
+        if (notificationHandler == null || messages == null || messages.size() == 0) {
+            LOG.error("Publisher.dispatch(source={}): consumer={}: error sending logs", this.source, notificationHandlerName);
+        } else {
+            dispatch(record.getPath(), messages);
+
+            record.setCurrentLine(lineInSpoolFile);
+            indexManagement.update(record);
+            isDestDown = false;
+        }
+    }
+
+    private void dispatch(String filePath, List<String> messages) throws Exception {
+        try {
+            notificationHandler.sendInternal(NotificationInterface.NotificationType.HOOK, messages);
+
+            if (isDestDown) {
+                LOG.info("Publisher.dispatch(source={}): consumer={}: destination is now up. file={}", this.source, notificationHandlerName, filePath);
+            }
+        } catch (Exception exception) {
+            setDestinationDown();
+
+            LOG.error("Publisher.dispatch(source={}): consumer={}: error while sending logs to consumer", this.source, notificationHandlerName, exception);
+
+            throw new NotificationException(exception, String.format("%s: %s: Publisher: Destination down!", this.source, notificationHandlerName));
+        } finally {
+            messages.clear();
+        }
+    }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
new file mode 100644
index 0000000..a9a3a78
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool;
+
+import org.apache.commons.configuration.Configuration;
+
+import java.io.File;
+
+public class SpoolConfiguration {
+    private static final int    PROP_RETRY_DESTINATION_MS_DEFAULT               = 30000; // Default 30 seconds
+    private static final int    PROP_FILE_ROLLOVER_SEC_DEFAULT                  = 60; // 60 secs
+    private static final int    PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT = 100;
+    private static final String PROP_FILE_SPOOL_ARCHIVE_DIR_DEFAULT             = "archive";
+    private static final String PROP_FILE_SPOOL_LOCAL_DIR_DEFAULT               = "/tmp/spool";
+    private static final int    PROP_FILE_MESSAGE_BATCH_SIZE_DEFAULT            = 100;
+    private static final String PROPERTY_PREFIX_SPOOL                           = "atlas.hook.spool.";
+    public  static final String PROP_FILE_SPOOL_LOCAL_DIR                       = PROPERTY_PREFIX_SPOOL + "dir";
+    private static final String PROP_FILE_SPOOL_ARCHIVE_DIR                     = PROPERTY_PREFIX_SPOOL + "archive.dir";
+    private static final String PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT         = PROPERTY_PREFIX_SPOOL + "archive.max.files";
+    public  static final String PROP_FILE_SPOOL_FILE_ROLLOVER_SEC               = PROPERTY_PREFIX_SPOOL + "file.rollover.sec";
+    public  static final String PROP_FILE_SPOOL_DEST_RETRY_MS                   = PROPERTY_PREFIX_SPOOL + "destination.retry.ms";
+    private static final String PROP_MESSAGE_BATCH_SIZE                         = PROPERTY_PREFIX_SPOOL + "destination.message.batchsize";
+
+    private final String messageHandlerName;
+    private final int    maxArchivedFilesCount;
+    private final int    messageBatchSize;
+    private final int    retryDestinationMS;
+    private final int    fileRollOverSec;
+    private final int    fileSpoolMaxFilesCount;
+    private final String spoolDirPath;
+    private final String archiveDir;
+    private       String sourceName;
+
+    public SpoolConfiguration(Configuration cfg, String messageHandlerName) {
+        this.messageHandlerName     = messageHandlerName;
+        this.maxArchivedFilesCount  = cfg.getInt(PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT);
+        this.messageBatchSize       = cfg.getInt(PROP_MESSAGE_BATCH_SIZE, PROP_FILE_MESSAGE_BATCH_SIZE_DEFAULT);
+        this.retryDestinationMS     = cfg.getInt(PROP_FILE_SPOOL_DEST_RETRY_MS, PROP_RETRY_DESTINATION_MS_DEFAULT);
+        this.fileRollOverSec        = cfg.getInt(PROP_FILE_SPOOL_FILE_ROLLOVER_SEC, PROP_FILE_ROLLOVER_SEC_DEFAULT) * 1000;
+        this.fileSpoolMaxFilesCount = cfg.getInt(PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT_DEFAULT);
+        this.spoolDirPath           = cfg.getString(SpoolConfiguration.PROP_FILE_SPOOL_LOCAL_DIR, PROP_FILE_SPOOL_LOCAL_DIR_DEFAULT);
+        this.archiveDir             = cfg.getString(PROP_FILE_SPOOL_ARCHIVE_DIR, new File(getSpoolDirPath(), PROP_FILE_SPOOL_ARCHIVE_DIR_DEFAULT).toString());
+    }
+
+    public void setSource(String val) {
+        this.sourceName = val;
+    }
+
+    public String getSourceName() {
+        return this.sourceName;
+    }
+
+    public int getMaxArchiveFiles() {
+        return maxArchivedFilesCount;
+    }
+
+    public int getRetryDestinationMS() {
+        return retryDestinationMS;
+    }
+
+    public int getFileRolloverSec() {
+        return this.fileRollOverSec;
+    }
+
+    public int getFileSpoolMaxFilesCount() {
+        return fileSpoolMaxFilesCount;
+    }
+
+    public String getSpoolDirPath() {
+        return spoolDirPath;
+    }
+
+    public File getSpoolDir() {
+        return new File(getSpoolDirPath());
+    }
+
+    public File getArchiveDir() {
+        return new File(this.archiveDir);
+    }
+
+    public String getMessageHandlerName() {
+        return this.messageHandlerName;
+    }
+
+    public int getMessageBatchSize() {
+        return messageBatchSize;
+    }
+
+    public File getIndexFile() {
+        String fileName = SpoolUtils.getIndexFileName(getSourceName(), getMessageHandlerName());
+
+        return new File(getSpoolDir(), fileName);
+    }
+
+    public File getIndexDoneFile() {
+        String fileName     = SpoolUtils.getIndexFileName(getSourceName(), getMessageHandlerName());
+        String fileDoneName = SpoolUtils.getIndexDoneFile(fileName);
+
+        return new File(getSpoolDir(), fileDoneName);
+    }
+
+    public File getIndexPublishFile() {
+        String fileName     = SpoolUtils.getIndexFileName(getSourceName(), getMessageHandlerName());
+        String fileDoneName = SpoolUtils.getIndexPublishFile(fileName);
+
+        return new File(getSpoolDir(), fileDoneName);
+    }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java
new file mode 100644
index 0000000..abbe33d
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/SpoolUtils.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool;
+
+import org.apache.atlas.notification.spool.models.IndexRecord;
+import org.apache.atlas.notification.spool.models.IndexRecords;
+import org.apache.atlas.type.AtlasType;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+import java.text.SimpleDateFormat;
+
+public class SpoolUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(SpoolUtils.class);
+
+    public  static final String           DEFAULT_CHAR_SET              = "UTF-8";
+    private static final String           DEFAULT_LINE_SEPARATOR        = System.getProperty("line.separator");
+    private static final String           FILE_EXT_JSON                 = ".json";
+    public  static final String           FILE_EXT_LOG                  = ".log";
+    private static final String           SPOOL_FILE_NAME_FORMAT_PREFIX = "%s.%s%s";
+    private static final String           INDEX_FILE_CLOSED_SUFFIX      = "_closed.json";
+    private static final String           INDEX_FILE_PUBLISH_SUFFIX     = "_publish.json";
+    private static final String           INDEX_FILE_NAME_FORMAT        = "index-%s-%s" + FILE_EXT_JSON;
+    private static final String           SPOOL_FILE_NAME_FORMAT        = "spool-%s-%s-%s" + FILE_EXT_LOG;
+    private static final String           RECORD_EMPTY = StringUtils.leftPad(StringUtils.EMPTY, IndexRecord.RECORD_SIZE) + SpoolUtils.getLineSeparator();
+
+    public static File getCreateFile(File file, String source) throws IOException {
+        if (createFileIfNotExists(file, source)) {
+            LOG.info("SpoolUtils.getCreateFile(source={}): file={}", source, file.getAbsolutePath());
+        }
+
+        return file;
+    }
+
+    public static boolean createFileIfNotExists(File file, String source) throws IOException {
+        boolean ret = file.exists();
+
+        if (!ret) {
+            ret = file.createNewFile();
+
+            if (!ret) {
+                LOG.error("SpoolUtils.createFileIfNotExists(source={}): error creating file {}", source, file.getPath());
+
+                ret = false;
+            }
+        }
+
+        return ret;
+    }
+
+    public static File getCreateDirectory(File file) {
+        File ret = file;
+
+        if (!file.isDirectory()) {
+            boolean result = file.mkdirs();
+
+            if (!file.isDirectory() || !result) {
+                LOG.error("SpoolUtils.getCreateDirectory({}): inaccessible!", file.toString());
+
+                ret = null;
+            }
+        }
+
+        return ret;
+    }
+
+    public static PrintWriter createAppendPrintWriter(File filePath) throws UnsupportedEncodingException, FileNotFoundException {
+        return new PrintWriter(
+                new BufferedWriter(
+                        new OutputStreamWriter(
+                                new FileOutputStream(filePath, true), DEFAULT_CHAR_SET)));
+    }
+
+    public static String getIndexFileName(String source, String handlerName) {
+        return String.format(SpoolUtils.INDEX_FILE_NAME_FORMAT, source, handlerName);
+    }
+
+    public static String getIndexDoneFile(String filePath) {
+        return StringUtils.substringBeforeLast(filePath, SpoolUtils.FILE_EXT_JSON) + SpoolUtils.INDEX_FILE_CLOSED_SUFFIX;
+    }
+
+    public static String getIndexPublishFile(String filePath) {
+        return StringUtils.substringBeforeLast(filePath, SpoolUtils.FILE_EXT_JSON) + SpoolUtils.INDEX_FILE_PUBLISH_SUFFIX;
+    }
+
+    public static boolean fileExists(IndexRecord record) {
+        return record != null && new File(record.getPath()).exists();
+    }
+
+    static String getSpoolFileName(String source, String handlerName, String guid) {
+        return String.format(SPOOL_FILE_NAME_FORMAT, source, handlerName, guid);
+    }
+
+    public static String getSpoolFilePath(SpoolConfiguration cfg, String spoolDir, String archiveFolder, String suffix) {
+        File   ret         = null;
+        String fileName    = getSpoolFileName(cfg.getSourceName(), cfg.getMessageHandlerName(), suffix);
+        int    lastDot   = StringUtils.lastIndexOf(fileName, '.');
+        String baseName  = fileName.substring(0, lastDot);
+        String extension = fileName.substring(lastDot);
+
+        for (int sequence = 1; true; sequence++) {
+            ret = new File(spoolDir, fileName);
+
+            File archiveLogFile = new File(archiveFolder, fileName);
+
+            if (!ret.exists() && !archiveLogFile.exists()) {
+                break;
+            }
+
+            fileName = String.format(SPOOL_FILE_NAME_FORMAT_PREFIX, baseName, sequence, extension);
+        }
+
+        return ret.getPath();
+    }
+
+    public static String getLineSeparator() {
+        return DEFAULT_LINE_SEPARATOR;
+    }
+
+    public static String getRecordForWriting(IndexRecord record) {
+        String json = AtlasType.toJson(record);
+
+        return StringUtils.rightPad(json, IndexRecord.RECORD_SIZE) + SpoolUtils.getLineSeparator();
+    }
+
+    public static String getEmptyRecordForWriting() {
+        return RECORD_EMPTY;
+    }
+
+    public static IndexRecords createRecords(String[] items) {
+        IndexRecords records = new IndexRecords();
+
+        if (items != null && items.length > 0) {
+            try {
+                for (String item : items) {
+                    if (StringUtils.isNotBlank(item)) {
+                        IndexRecord record = AtlasType.fromJson(item, IndexRecord.class);
+
+                        records.getRecords().put(record.getId(), record);
+                    }
+                }
+            } catch (Exception ex) {
+                LOG.error("SpoolUtils.createRecords(): error loading records.", ex);
+            }
+        }
+
+        return records;
+    }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java b/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java
new file mode 100644
index 0000000..2cacaaa
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.hook.FailedMessagesLogger;
+import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutput;
+import java.io.PrintWriter;
+import java.util.List;
+
+public class Spooler extends AbstractNotification {
+    private static final Logger LOG = LoggerFactory.getLogger(Spooler.class);
+
+    private final SpoolConfiguration   configuration;
+    private final IndexManagement      indexManagement;
+    private       FailedMessagesLogger failedMessagesLogger;
+    private       boolean              isDrain;
+
+    public Spooler(SpoolConfiguration configuration, IndexManagement indexManagement) {
+        this.configuration   = configuration;
+        this.indexManagement = indexManagement;
+    }
+
+    public void setFailedMessagesLogger(FailedMessagesLogger failedMessagesLogger) {
+        this.failedMessagesLogger = failedMessagesLogger;
+    }
+
+    public void setDrain() {
+        this.isDrain = true;
+    }
+
+    @Override
+    public <T> List<NotificationConsumer<T>> createConsumers(org.apache.atlas.notification.NotificationInterface.NotificationType notificationType, int numConsumers) {
+        return null;
+    }
+
+    @Override
+    public void sendInternal(NotificationType type, List<String> messages) {
+        boolean ret = write(messages);
+
+        if (failedMessagesLogger != null && !ret) {
+            writeToFailedMessages(messages);
+        }
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @VisibleForTesting
+    boolean write(List<String> messages) {
+        final boolean ret;
+
+        try {
+            if (!getDrain()) {
+                indexManagement.setSpoolWriteInProgress();
+
+                ret = writeInternal(messages);
+            } else {
+                LOG.error("Spooler.write(source={}): called after stop is called! Write will not be performed!", configuration.getSourceName(), messages);
+
+                ret = false;
+            }
+        } finally {
+            indexManagement.resetSpoolWriteInProgress();
+        }
+
+        return ret;
+    }
+
+    private void writeToFailedMessages(List<String> messages) {
+        if (failedMessagesLogger != null) {
+            for (String message : messages) {
+                failedMessagesLogger.log(message);
+            }
+        }
+    }
+
+    private boolean writeInternal(List<String> messages) {
+        boolean ret = false;
+
+        try {
+            byte[]     lineSeparatorBytes = SpoolUtils.getLineSeparator().getBytes(SpoolUtils.DEFAULT_CHAR_SET);
+            DataOutput pw                 = indexManagement.getSpoolWriter();
+
+            for (String message : messages) {
+                pw.write(message.getBytes(SpoolUtils.DEFAULT_CHAR_SET));
+                pw.write(lineSeparatorBytes);
+            }
+
+            indexManagement.flushSpoolWriter();
+
+            ret = true;
+        } catch (Exception exception) {
+            LOG.error("Spooler.writeInternal(source={}): error writing to file. messages={}", configuration.getSourceName(), messages, exception);
+
+            ret = false;
+        }
+
+        return ret;
+    }
+
+    private boolean getDrain() {
+        return this.isDrain;
+    }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/models/IndexRecord.java b/notification/src/main/java/org/apache/atlas/notification/spool/models/IndexRecord.java
new file mode 100644
index 0000000..21dad06
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/models/IndexRecord.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool.models;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.atlas.type.AtlasType;
+import org.apache.commons.lang.StringUtils;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+import java.util.UUID;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class IndexRecord implements Serializable {
+    public static final int    RECORD_SIZE              = 500;
+    public static final String STATUS_PENDING           = "PENDING";
+    public static final String STATUS_WRITE_IN_PROGRESS = "WRITE_IN_PROGRESS";
+    public static final String STATUS_READ_IN_PROGRESS  = "READ_IN_PROGRESS";
+    public static final String STATUS_DONE              = "DONE";
+
+    private String  id;
+    private String  path;
+    private int     line;
+    private long    created;
+    private long    writeCompleted;
+    private long    doneCompleted;
+    private long    lastSuccess;
+    private long    lastFailed;
+    private boolean lastAttempt;
+    private int     failedAttempt;
+    private String  status;
+
+    public IndexRecord() {
+        this.status      = STATUS_WRITE_IN_PROGRESS;
+        this.lastAttempt = false;
+    }
+
+    public IndexRecord(String path) {
+        this.id            = UUID.randomUUID().toString();
+        this.path          = path;
+        this.failedAttempt = 0;
+        this.status        = STATUS_WRITE_IN_PROGRESS;
+        this.created       = System.currentTimeMillis();
+
+        setLastAttempt(false);
+    }
+
+    @Override
+    public String toString() {
+        return "IndexRecord [id=" + id + ", filePath=" + path
+                + ", linePosition=" + line + ", status=" + status
+                + ", fileCreateTime=" + created
+                + ", writeCompleteTime=" + writeCompleted
+                + ", doneCompleteTime=" + doneCompleted
+                + ", lastSuccessTime=" + lastSuccess
+                + ", lastFailedTime=" + lastFailed
+                + ", failedAttemptCount=" + failedAttempt
+                + ", lastAttempt=" + lastAttempt + "]";
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getId() {
+        return this.id;
+    }
+
+    public void setPath(String path) {
+        this.path = path;
+    }
+
+    public String getPath() {
+        return this.path;
+    }
+
+    public void setLine(int line) {
+        this.line = line;
+    }
+
+    public int getLine() {
+        return line;
+    }
+
+    public void setCreated(long fileCreateTime) {
+        this.created = fileCreateTime;
+    }
+
+    public long getCreated() {
+        return this.created;
+    }
+
+    public void setWriteCompleted(long writeCompleted) {
+        this.writeCompleted = writeCompleted;
+    }
+
+    public long getWriteCompleted() {
+        return this.writeCompleted;
+    }
+
+    public void setDoneCompleted(long doneCompleted) {
+        this.doneCompleted = doneCompleted;
+    }
+
+    public long getDoneCompleted() {
+        return doneCompleted;
+    }
+
+    public void setLastSuccess(long lastSuccess) {
+        this.lastSuccess = lastSuccess;
+    }
+
+    public long getLastSuccess() {
+        return lastSuccess;
+    }
+
+    public void setLastFailed(long lastFailed) {
+        this.lastFailed = lastFailed;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+    }
+
+    public String getStatus() {
+        return this.status;
+    }
+
+    public void setLastAttempt(boolean lastAttempt) {
+        this.lastAttempt = lastAttempt;
+    }
+
+    public void setFailedAttempt(int failedAttempt) {
+        this.failedAttempt = failedAttempt;
+    }
+
+    public int getFailedAttempt() {
+        return failedAttempt;
+    }
+
+    @JsonIgnore
+    public void setDone() {
+        setStatus(IndexRecord.STATUS_DONE);
+        setDoneCompleted(System.currentTimeMillis());
+        setLastAttempt(true);
+    }
+
+    @JsonIgnore
+    public void setStatusPending() {
+        setStatus(IndexRecord.STATUS_PENDING);
+        setWriteCompleted(System.currentTimeMillis());
+        setLastAttempt(true);
+    }
+
+    @JsonIgnore
+    public void updateFailedAttempt() {
+        setLastFailed(System.currentTimeMillis());
+        incrementFailedAttemptCount();
+        setLastAttempt(false);
+    }
+
+    @JsonIgnore
+    public boolean equals(IndexRecord record) {
+        return this.id.equals(record.getId());
+    }
+
+    @JsonIgnore
+    public void setCurrentLine(int line) {
+        setLine(line);
+        setStatus(STATUS_READ_IN_PROGRESS);
+        setLastSuccess(System.currentTimeMillis());
+        setLastAttempt(true);
+    }
+
+    @JsonIgnore
+    public boolean isStatusDone() {
+        return this.status.equals(STATUS_DONE);
+    }
+
+    @JsonIgnore
+    public boolean isStatusWriteInProgress() {
+        return this.status.equals(STATUS_WRITE_IN_PROGRESS);
+    }
+
+    @JsonIgnore
+    public boolean isStatusReadInProgress() {
+        return status.equals(IndexRecord.STATUS_READ_IN_PROGRESS);
+    }
+
+    @JsonIgnore
+    public void incrementFailedAttemptCount() {
+        this.failedAttempt++;
+    }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/models/IndexRecords.java b/notification/src/main/java/org/apache/atlas/notification/spool/models/IndexRecords.java
new file mode 100644
index 0000000..abcb837
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/models/IndexRecords.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool.models;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class IndexRecords implements Serializable {
+    private LinkedHashMap<String, IndexRecord> records;
+
+    public IndexRecords() {
+        this.records = new LinkedHashMap<>();
+    }
+
+    public Map<String, IndexRecord> getRecords() {
+        return records;
+    }
+
+    public void setRecords(LinkedHashMap<String, IndexRecord> records) {
+        this.records = records;
+    }
+
+    @JsonIgnore
+    public int size() {
+        LinkedHashMap<String, IndexRecord> records = this.records;
+
+        return records != null ? records.size() : 0;
+    }
+
+    @JsonIgnore
+    public void remove(IndexRecord record) {
+        LinkedHashMap<String, IndexRecord> records = this.records;
+
+        if (records != null) {
+            records.remove(record.getId());
+        }
+    }
+
+    @JsonIgnore
+    public void add(IndexRecord record) {
+        LinkedHashMap<String, IndexRecord> records = this.records;
+
+        if (records == null) {
+            records = new LinkedHashMap<>();
+
+            this.records = records;
+        }
+
+        records.put(record.getId(), record);
+    }
+
+    @JsonIgnore
+    public void delete(IndexRecord record) {
+        remove(record);
+    }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileLockedReadWrite.java b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileLockedReadWrite.java
new file mode 100644
index 0000000..5d5ad8c
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileLockedReadWrite.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool.utils.local;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+
+public class FileLockedReadWrite extends FileOperation {
+    private RandomAccessFile raf;
+    private FileChannel      channel;
+    private FileLock         lock;
+
+    public FileLockedReadWrite(String source) {
+        super(source);
+    }
+
+    @Override
+    public FileLock run(RandomAccessFile randomAccessFile, FileChannel channel, String json) throws IOException {
+        this.raf     = randomAccessFile;
+        this.channel = channel;
+        this.lock    = channel.tryLock();
+
+        return lock;
+    }
+
+    public DataInput getInput(File file) throws IOException {
+        return getRaf(file);
+    }
+
+    public DataOutput getOutput(File file) throws IOException {
+        return getRaf(file);
+    }
+
+    public void flush() throws IOException {
+        if (channel != null) {
+            channel.force(true);
+        }
+    }
+
+    public void close() {
+        super.close(this.raf, this.channel, this.lock);
+    }
+
+    private RandomAccessFile getRaf(File file) throws IOException {
+        RandomAccessFile raf = new RandomAccessFile(file, "rws");
+
+        run(raf, raf.getChannel(), StringUtils.EMPTY);
+
+        return raf;
+    }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationException.java b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpAppend.java
similarity index 56%
copy from notification/src/main/java/org/apache/atlas/notification/NotificationException.java
copy to notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpAppend.java
index 2dd9c9f..bfc113d 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpAppend.java
@@ -15,28 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.atlas.notification;
+package org.apache.atlas.notification.spool.utils.local;
 
-import org.apache.atlas.AtlasException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
 
-import java.util.List;
+public class FileOpAppend extends FileOperation {
 
-/**
- * Exception from notification.
- */
-public class NotificationException extends AtlasException {
-    private List<String> failedMessages;
-
-    public NotificationException(Exception e) {
-        super(e);
+    public FileOpAppend(String source) {
+        super(source);
     }
 
-    public NotificationException(Exception e, List<String> failedMessages) {
-        super(e);
-        this.failedMessages = failedMessages;
-    }
+    @Override
+    public FileLock run(RandomAccessFile randomAccessFile, FileChannel channel, String json) throws IOException {
+        FileLock lock = channel.tryLock(randomAccessFile.length(), json.length(), false);
+
+        channel.position(randomAccessFile.length());
+
+        randomAccessFile.writeBytes(json);
 
-    public List<String> getFailedMessages() {
-        return failedMessages;
+        return lock;
     }
 }
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpCompaction.java b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpCompaction.java
new file mode 100644
index 0000000..8240a80
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpCompaction.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool.utils.local;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+
+public class FileOpCompaction extends FileOperation {
+    private final FileOpRead fileOpLoad;
+
+    public FileOpCompaction(String source) {
+        super(source);
+
+        this.fileOpLoad = new FileOpRead(source);
+    }
+
+    @Override
+    public FileLock run(RandomAccessFile file, FileChannel channel, String json) throws IOException {
+        FileLock lock = file.getChannel().tryLock();
+
+        fileOpLoad.perform(getFile(), StringUtils.EMPTY);
+
+        file.getChannel().truncate(0);
+
+        String[] rawItems = fileOpLoad.getItems();
+
+        if (rawItems != null) {
+            for (String record : rawItems) {
+                if (StringUtils.isNotBlank(record)) {
+                    file.writeBytes(record);
+                }
+            }
+        }
+
+        return lock;
+    }
+}
diff --git a/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpDelete.java
similarity index 52%
copy from notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java
copy to notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpDelete.java
index 2dd970e..19243a6 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/NotificationProvider.java
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpDelete.java
@@ -15,27 +15,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.atlas.kafka;
+package org.apache.atlas.notification.spool.utils.local;
 
-import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasException;
-import org.apache.commons.configuration.Configuration;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
 
-/**
- * Provider class for Notification interfaces
- */
-public class NotificationProvider {
-    private static KafkaNotification kafka;
+public class FileOpDelete extends FileOperation {
+    public FileOpDelete(String source) {
+        super(source);
+    }
+
+    @Override
+    public FileLock run(RandomAccessFile file, FileChannel channel, String json) throws IOException {
+        final FileLock ret;
+        final long     position = find(file, getId());
 
-    public static KafkaNotification get() {
-        if (kafka == null) {
-            try {
-                Configuration applicationProperties = ApplicationProperties.get();
-                kafka = new KafkaNotification(applicationProperties);
-            } catch (AtlasException e) {
-                throw new RuntimeException(e);
-            }
+        if (position < 0) {
+            ret = null;
+        } else {
+            ret = channel.tryLock(position, json.length(), false);
+
+            channel.position(position);
+
+            file.writeBytes(json);
         }
-        return kafka;
+
+        return ret;
     }
 }
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpRead.java b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpRead.java
new file mode 100644
index 0000000..b228090
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpRead.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool.utils.local;
+
+import org.apache.atlas.notification.spool.SpoolUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+
+public class FileOpRead extends FileOperation {
+    private static final Logger LOG = LoggerFactory.getLogger(FileOpRead.class);
+
+    private String[] items;
+
+    public FileOpRead(String source) {
+        super(source);
+    }
+
+    @Override
+    public FileLock run(RandomAccessFile randomAccessFile, FileChannel channel, String json) throws IOException {
+        items = null;
+
+        byte[] bytes = new byte[(int) randomAccessFile.length()];
+
+        randomAccessFile.readFully(bytes);
+
+        int    rawRecords = 0;
+        String allRecords = new String(bytes);
+
+        if (StringUtils.isNotEmpty(allRecords)) {
+            items = StringUtils.split(allRecords, SpoolUtils.getLineSeparator());
+
+            if (items != null) {
+                rawRecords = items.length;
+            }
+        }
+
+        LOG.info("FileOpRead.run(source={}): loaded file {}, raw records={}", this.getSource(), this.getFile().getAbsolutePath(), rawRecords);
+
+        return null;
+    }
+
+    public String[] getItems() {
+        return items;
+    }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpUpdate.java b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpUpdate.java
new file mode 100644
index 0000000..cd58e86
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOpUpdate.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool.utils.local;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+
+public class FileOpUpdate extends FileOperation {
+    private       String       id;
+    private final FileOpAppend fileOpAppend;
+
+    public FileOpUpdate(String source, FileOpAppend fileOpAppend) {
+        super(source);
+
+        this.fileOpAppend = fileOpAppend;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    @Override
+    public FileLock run(RandomAccessFile file, FileChannel channel, String json) throws IOException {
+        final FileLock ret;
+        final long     position = find(file, getId());
+
+        if (position < 0) {
+            ret = fileOpAppend.run(file, channel, json);
+        } else {
+            ret = channel.tryLock(position, json.length(), false);
+
+            channel.position(position);
+
+            file.writeBytes(json);
+        }
+
+        return ret;
+    }
+}
diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOperation.java b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOperation.java
new file mode 100644
index 0000000..e5bf9d2
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/notification/spool/utils/local/FileOperation.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool.utils.local;
+
+import org.apache.atlas.notification.spool.SpoolUtils;
+import org.apache.atlas.notification.spool.models.IndexRecord;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.math.RandomUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
+import java.util.concurrent.TimeUnit;
+
+public abstract class FileOperation {
+    private static final Logger LOG = LoggerFactory.getLogger(FileOperation.class);
+
+    private static final int    MAX_RETRY_ATTEMPTS               = 5;
+    private static final String RANDOM_ACCESS_FILE_OPEN_MODE_RWS = "rws";
+    private static final String RANDOM_ACCESS_FILE_OPEN_MODE_R   = "r";
+
+    private final String source;
+    private       File   file;
+    private       String id;
+
+    public static RandomAccessFile createRandomAccessFileForRead(File file) throws FileNotFoundException {
+        return new RandomAccessFile(file, RANDOM_ACCESS_FILE_OPEN_MODE_R);
+    }
+
+    public static RandomAccessFile createRandomAccessFile(File file) throws FileNotFoundException {
+        return new RandomAccessFile(file, RANDOM_ACCESS_FILE_OPEN_MODE_RWS);
+    }
+
+    public static long find(RandomAccessFile raf, String id) throws IOException {
+        while (true) {
+            String line = raf.readLine();
+
+            if (StringUtils.isEmpty(line)) {
+                break;
+            }
+
+            if (line.contains(id)) {
+                return raf.getChannel().position() - SpoolUtils.getLineSeparator().length() - IndexRecord.RECORD_SIZE;
+            }
+        }
+
+        return -1;
+    }
+
+    public FileOperation(String source) {
+        this(source, false);
+    }
+
+    public FileOperation(String source, boolean notifyConcurrency) {
+        this.source = source;
+    }
+
+    public String getSource() {
+        return source;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public void perform(File file) {
+        perform(file, StringUtils.EMPTY);
+    }
+
+    public void perform(File file, String json) {
+        setFile(file);
+
+        performWithRetry(file, json);
+    }
+
+    public void perform(File file, String id, String json) {
+        this.setId(id);
+        perform(file, json);
+    }
+
+    public abstract FileLock run(RandomAccessFile randomAccessFile, FileChannel channel, String json) throws IOException;
+
+
+    protected File getFile() {
+        return this.file;
+    }
+
+    protected String getId() {
+        return this.id;
+    }
+
+    protected void close(RandomAccessFile randomAccessFile, FileChannel channel, FileLock lock) {
+        try {
+            if (channel != null) {
+                channel.force(true);
+            }
+
+            if (lock != null) {
+                lock.release();
+            }
+
+            if (channel != null) {
+                channel.close();
+            }
+
+            if (randomAccessFile != null) {
+                randomAccessFile.close();
+            }
+        } catch (IOException exception) {
+            LOG.error("FileOperation(source={}).close(): failed", getSource(), exception);
+        }
+    }
+
+
+    private void setFile(File file) {
+        this.file = file;
+    }
+
+    private void performWithRetry(File file, String json) {
+        for (int i = 0; i < MAX_RETRY_ATTEMPTS; i++) {
+            try {
+                performOperation(json);
+                return;
+            } catch (OverlappingFileLockException e) {
+                try {
+                    int timeout = 1 + (50 * RandomUtils.nextInt(10));
+                    LOG.info("FileOperation.performWithRetry(source={}): {}: {}: Waiting: {} ms...", getSource(), getClass().getSimpleName(), file.getName(), timeout);
+
+                    TimeUnit.MILLISECONDS.sleep(timeout);
+                } catch (InterruptedException ex) {
+                    LOG.error("FileOperation.performWithRetry(source={}): {}: Interrupted!", getSource(), file.getAbsolutePath(), ex);
+                }
+
+                LOG.info("FileOperation.performWithRetry(source={}): {}: Re-trying: {}!", getSource(), file.getAbsolutePath(), i);
+            }
+        }
+
+        LOG.info("FileOperation.performWithRetry(source={}): {}: appendRecord: Could not write.", getSource(), file.getAbsolutePath());
+    }
+
+    private boolean performOperation(String json) {
+        RandomAccessFile randomAccessFile = null;
+        FileChannel      channel          = null;
+        FileLock         lock             = null;
+
+        try {
+            randomAccessFile = new RandomAccessFile(getFile(), "rws");
+            channel          = randomAccessFile.getChannel();
+            lock             = run(randomAccessFile, channel, json);
+        } catch (FileNotFoundException e) {
+            LOG.error("FileOperation.performOperation(source={}): file={}: file not found", getSource(), getFile().getAbsolutePath(), e);
+        } catch (IOException exception) {
+            LOG.error("FileOperation.performOperation(source={}): file={}: failed", getSource(), getFile().getAbsolutePath());
+        } finally {
+            close(randomAccessFile, channel, lock);
+        }
+
+        return true;
+    }
+}
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
index 94cb70d..d7e4959 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
@@ -112,7 +112,7 @@ public class AbstractNotificationTest {
         }
 
         @Override
-        protected void sendInternal(NotificationType notificationType, List<String> notificationMessages)
+        public void sendInternal(NotificationType notificationType, List<String> notificationMessages)
             throws NotificationException {
 
             type     = notificationType;
diff --git a/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java b/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java
new file mode 100644
index 0000000..167efbe
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.notification.AbstractNotification;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationException;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.atlas.notification.NotificationInterface.NotificationType.HOOK;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+public class AtlasFileSpoolTest extends BaseTest {
+    private static int MAX_RECORDS = 50;
+
+    private static class MessageHandlerSpy extends AbstractNotification {
+
+        private List<String> publishedMessages = new ArrayList<>();
+
+        public List<String> getMessages() {
+            return publishedMessages;
+        }
+
+        @Override
+        public void init(String source, Object failedMessagesLogger) {
+        }
+
+        @Override
+        public void setCurrentUser(String user) {
+
+        }
+
+        @Override
+        public void sendInternal(NotificationType type, List<String> messages) throws NotificationException {
+            publishedMessages.addAll(messages);
+
+        }
+
+        @Override
+        public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) {
+            return null;
+        }
+
+        @Override
+        public <T> void send(NotificationType type, T... messages) throws NotificationException {
+        }
+
+        @Override
+        public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
+        }
+
+        @Override
+        public void close() {
+
+        }
+    }
+
+    @Test
+    public void indexSetupMultipleTimes() throws IOException, AtlasException {
+        SpoolConfiguration cfg = getSpoolConfiguration();
+        IndexManagement indexManagement = new IndexManagement(cfg);
+
+        for (int i = 0; i < 2; i++) {
+            indexManagement.init();
+            assertTrue(cfg.getSpoolDir().exists());
+            assertTrue(cfg.getArchiveDir().exists());
+
+            File indexFile = indexManagement.getIndexFileManager().getIndexFile();
+            File indexDoneFile = indexManagement.getIndexFileManager().getDoneFile();
+
+            assertTrue(indexFile.exists(), "File not created: " + indexFile.getAbsolutePath());
+            assertTrue(indexDoneFile.exists(), "File not created: " + indexDoneFile.getAbsolutePath());
+        }
+    }
+
+    @Test
+    public void spoolerTest() throws IOException, AtlasException {
+        SpoolConfiguration cfg = getSpoolConfigurationTest();
+        IndexManagement indexManagement = new IndexManagement(cfg);
+
+        indexManagement.init();
+        Spooler spooler = new Spooler(cfg, indexManagement);
+        for (int i = 0; i < MAX_RECORDS; i++) {
+            spooler.write(Collections.singletonList("message: " + i));
+        }
+
+        indexManagement.stop();
+    }
+
+    @Test(dependsOnMethods = "spoolerTest")
+    public void publisherTest() throws IOException, AtlasException, InterruptedException {
+        SpoolConfiguration cfg = getSpoolConfigurationTest();
+
+        IndexManagement indexManagement = new IndexManagement(cfg);
+        indexManagement.init();
+
+        MessageHandlerSpy messageHandler = new MessageHandlerSpy();
+        Publisher publisher = new Publisher(cfg, indexManagement, messageHandler);
+        boolean ret = publisher.processAndDispatch(indexManagement.getIndexFileManager().getRecords().get(0));
+
+        publisher.setDrain();
+        Assert.assertTrue(ret);
+        TimeUnit.SECONDS.sleep(5);
+        assertTrue(messageHandler.getMessages().size() >= 0);
+    }
+
+    @Test
+    public void indexRecordsRead() throws IOException, AtlasException {
+        SpoolConfiguration spoolCfg = getSpoolConfigurationTest();
+        IndexManagement indexManagement = new IndexManagement(spoolCfg);
+        indexManagement.init();
+
+    }
+
+    @Test
+    public void concurrentWriteAndPublish() throws InterruptedException, IOException, AtlasException {
+        final int MAX_PROCESSES = 4;
+        SpoolConfiguration spoolCfg = getSpoolConfigurationTest(5);
+
+        IndexManagement[] im1 = new IndexManagement[MAX_PROCESSES];
+        MessageHandlerSpy[] messageHandlerSpy = new MessageHandlerSpy[MAX_PROCESSES];
+
+        for (int i = 0; i < MAX_PROCESSES; i++) {
+            messageHandlerSpy[i] = new MessageHandlerSpy();
+            im1[i] = new IndexManagement(spoolCfg);
+        }
+
+        for (int i = 0; i < MAX_PROCESSES; i++) {
+            im1[i].init();
+        }
+
+        IndexManagement imVerify = new IndexManagement(spoolCfg);
+        imVerify.init();
+        Assert.assertTrue(imVerify.getIndexFileManager().getRecords().size() >= 0);
+
+        Thread[] th1 = new Thread[MAX_PROCESSES];
+        for (int i = 0; i < MAX_PROCESSES; i++) {
+            th1[i] = new Thread(new MessagePump(new Spooler(spoolCfg, im1[i]), new Publisher(spoolCfg, im1[i], messageHandlerSpy[i])));
+        }
+
+        for (int i = 0; i < MAX_PROCESSES; i++) {
+            th1[i].start();
+        }
+
+        for (int i = 0; i < MAX_PROCESSES; i++) {
+            th1[i].join();
+        }
+
+        imVerify = new IndexManagement(spoolCfg);
+        imVerify.init();
+        Assert.assertEquals(imVerify.getIndexFileManager().getRecords().size(), 0);
+        for (int i = 0; i < MAX_PROCESSES; i++) {
+            Assert.assertTrue(messageHandlerSpy[i].getMessages().size() >= 0);
+        }
+    }
+
+    private class MessagePump implements Runnable {
+
+        private Spooler spooler;
+        private Publisher publisher;
+        private Thread publisherThread;
+
+        public MessagePump(Spooler spooler, Publisher publisher) {
+            this.spooler = spooler;
+            this.publisher = publisher;
+        }
+
+        @Override
+        public void run() {
+            publisherThread = new Thread(publisher);
+            publisherThread.start();
+
+            for (int i = 0; i < MAX_RECORDS; i++) {
+                try {
+                    spooler.send(HOOK, String.format("%s-%s", "message", i));
+
+                    Thread.sleep(RandomUtils.nextInt(10, 100));
+                } catch (NotificationException exception) {
+                    exception.printStackTrace();
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+
+            try {
+                Thread.sleep(10000);
+                publisher.setDrain();
+                publisherThread.join();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+
+    @AfterClass
+    public void tearDown() {
+        FileUtils.deleteQuietly(new File(spoolDirTest));
+    }
+}
diff --git a/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java b/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java
new file mode 100644
index 0000000..7ca745f
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/spool/BaseTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.io.IOException;
+
+public class BaseTest {
+    public static String spoolDir = System.getProperty("user.dir") + "/src/test/resources/spool";
+    public static String spoolDirTest = spoolDir + "-test";
+    protected final String SOURCE_TEST = "test-src";
+    protected final String SOURCE_TEST_HANDLER = "1";
+
+    protected final String knownIndexFilePath = "index-test-src-1.json";
+    protected final String knownIndexDoneFilePath = "index-test-src-1_closed.json";
+
+    protected File archiveDir = new File(spoolDir, "archive");
+    protected File indexFile = new File(spoolDir, knownIndexFilePath);
+    protected File indexDoneFile = new File(spoolDir, knownIndexDoneFilePath);
+
+    public SpoolConfiguration getSpoolConfiguration() {
+        return getSpoolConfiguration(spoolDir, SOURCE_TEST_HANDLER);
+    }
+
+    public SpoolConfiguration getSpoolConfigurationTest() {
+        return getSpoolConfiguration(spoolDirTest, SOURCE_TEST_HANDLER);
+    }
+    public SpoolConfiguration getSpoolConfigurationTest(Integer testId) {
+        return getSpoolConfiguration(spoolDirTest, testId.toString());
+    }
+
+    public SpoolConfiguration getSpoolConfiguration(String spoolDir, String handlerName) {
+        SpoolConfiguration cfg = new SpoolConfiguration(getConfiguration(spoolDir), handlerName);
+        cfg.setSource(SOURCE_TEST);
+        return cfg;
+    }
+
+    public Configuration getConfiguration(String spoolDir) {
+        final int destinationRetry = 2000;
+
+        PropertiesConfiguration props = new PropertiesConfiguration();
+        props.setProperty(SpoolConfiguration.PROP_FILE_SPOOL_LOCAL_DIR, spoolDir);
+        props.setProperty(SpoolConfiguration.PROP_FILE_SPOOL_DEST_RETRY_MS, Integer.toString(destinationRetry));
+        props.setProperty(SpoolConfiguration.PROP_FILE_SPOOL_FILE_ROLLOVER_SEC, Integer.toString(2));
+        return props;
+    }
+
+    protected File getNewIndexFile(char id) throws IOException {
+        File f = new File(spoolDirTest, knownIndexFilePath.replace('1', id));
+        FileUtils.copyFile(indexFile, f);
+        return f;
+    }
+
+    protected File getNewIndexDoneFile(char id) throws IOException {
+        File f = new File(spoolDirTest, knownIndexDoneFilePath.replace('1', id));
+        FileUtils.copyFile(indexDoneFile, f);
+        return f;
+    }
+}
diff --git a/notification/src/test/java/org/apache/atlas/notification/spool/IndexManagementTest.java b/notification/src/test/java/org/apache/atlas/notification/spool/IndexManagementTest.java
new file mode 100644
index 0000000..f9d2a06
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/notification/spool/IndexManagementTest.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.spool;
+
+import org.apache.atlas.notification.spool.models.IndexRecord;
+import org.apache.atlas.notification.spool.models.IndexRecords;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class IndexManagementTest extends BaseTest {
+    @Test
+    public void fileNameGeneration() {
+        String handlerName = "someHandler";
+        SpoolConfiguration cfg = getSpoolConfiguration(spoolDir, handlerName);
+
+        IndexRecord record = new IndexRecord(StringUtils.EMPTY);
+        Assert.assertEquals(SpoolUtils.getIndexFileName(cfg.getSourceName(), cfg.getMessageHandlerName()), "index-test-src-someHandler.json");
+        Assert.assertTrue(SpoolUtils.getSpoolFileName(cfg.getSourceName(), cfg.getMessageHandlerName(), record.getId()).startsWith("spool-test-src-someHandler-"));
+    }
+
+    @Test
+    public void verifyLoad() throws IOException {
+        final int expectedRecords = 2;
+        SpoolConfiguration cfg = getSpoolConfiguration();
+
+        IndexManagement.IndexFileManager indexFileManager = new IndexManagement.IndexFileManager(SOURCE_TEST, cfg.getIndexFile(), cfg.getIndexDoneFile(), null, 2);
+
+        Assert.assertEquals(indexFileManager.getRecords().size(), expectedRecords);
+
+        Assert.assertEquals(indexFileManager.getRecords().get(0).getId(), "1");
+        Assert.assertEquals(indexFileManager.getRecords().get(1).getId(), "2");
+    }
+
+    @Test
+    public void addAndRemove() throws IOException {
+        File newIndexFile = getNewIndexFile('3');
+        File newIndexDoneFile = getNewIndexDoneFile('3');
+
+        IndexManagement.IndexFileManager indexFileManager = new IndexManagement.IndexFileManager(SOURCE_TEST, newIndexFile, newIndexDoneFile, null, 2);
+
+        int expectedCount = 2;
+        Assert.assertEquals(indexFileManager.getRecords().size(), expectedCount);
+
+        IndexRecord r3 = indexFileManager.add("3.log");
+        IndexRecord r4 = indexFileManager.add("4.log");
+        IndexRecord r5 = indexFileManager.add("5.log");
+
+        r4.updateFailedAttempt();
+        indexFileManager.updateIndex(r4);
+
+        r5.setLine(100);
+        indexFileManager.updateIndex(r5);
+
+        IndexRecords records = indexFileManager.loadRecords(newIndexFile);
+        Assert.assertTrue(records.getRecords().containsKey(r3.getId()));
+        Assert.assertTrue(records.getRecords().containsKey(r4.getId()));
+        Assert.assertTrue(records.getRecords().containsKey(r5.getId()));
+
+        Assert.assertEquals(records.getRecords().get(r3.getId()).getStatus(), r3.getStatus());
+        Assert.assertEquals(records.getRecords().get(r4.getId()).getFailedAttempt(), r4.getFailedAttempt());
+        Assert.assertEquals(records.getRecords().get(r5.getId()).getLine(), r5.getLine());
+
+        indexFileManager.remove(r3);
+        indexFileManager.remove(r4);
+        indexFileManager.remove(r5);
+
+        Assert.assertEquals(indexFileManager.getRecords().size(), expectedCount);
+    }
+
+    @Test
+    public void verifyOperations() throws IOException {
+        SpoolConfiguration cfg = getSpoolConfigurationTest();
+
+        File newIndexFile = getNewIndexFile('2');
+        File newIndexDoneFile = getNewIndexDoneFile('2');
+
+        File archiveDir = cfg.getArchiveDir();
+        IndexManagement.IndexFileManager indexFileManager = new IndexManagement.IndexFileManager(SOURCE_TEST, newIndexFile, newIndexDoneFile, null, 2);
+
+        verifyAdding(indexFileManager);
+        verifySaveAndLoad(indexFileManager);
+        verifyRemove(indexFileManager);
+        verifyRecords(indexFileManager);
+
+        checkDoneFile(newIndexDoneFile, archiveDir, 2, "5.log");
+
+        verifyArchiving(indexFileManager);
+    }
+
+    private void verifyRecords(IndexManagement.IndexFileManager indexFileManager) {
+        List<IndexRecord> records = indexFileManager.getRecords();
+
+        Assert.assertEquals(records.size(), 5);
+        Assert.assertTrue(records.get(3).getPath().endsWith("3.log"));
+        Assert.assertEquals(records.get(3).getStatus(), IndexRecord.STATUS_WRITE_IN_PROGRESS);
+        Assert.assertEquals(records.get(2).getFailedAttempt(), 0);
+        Assert.assertEquals(records.get(1).getDoneCompleted(), 0);
+        Assert.assertEquals(records.get(0).getLine(), 0);
+        Assert.assertFalse(records.get(0).getLastSuccess() != 0);
+    }
+
+    private void verifyAdding(IndexManagement.IndexFileManager indexFileManager) throws IOException {
+        addFile(indexFileManager, spoolDirTest, "2.log");
+        addFile(indexFileManager, spoolDirTest, "3.log");
+        addFile(indexFileManager, spoolDirTest, "4.log");
+        addFile(indexFileManager, spoolDirTest, "5.log");
+    }
+
+    private void verifyArchiving(IndexManagement.IndexFileManager indexFileManager) {
+        indexFileManager.remove(indexFileManager.getRecords().get(1));
+        indexFileManager.remove(indexFileManager.getRecords().get(1));
+        indexFileManager.remove(indexFileManager.getRecords().get(1));
+        indexFileManager.remove(indexFileManager.getRecords().get(1));
+
+        checkArchiveDir(archiveDir);
+    }
+
+    private void verifyRemove(IndexManagement.IndexFileManager indexFileManager) throws IOException {
+        indexFileManager.remove(indexFileManager.getRecords().get(5));
+
+        boolean isPending = indexFileManager.getRecords().size() > 0;
+        Assert.assertTrue(isPending);
+    }
+
+    private void verifySaveAndLoad(IndexManagement.IndexFileManager indexFileManager) throws IOException {
+        indexFileManager.getRecords().get(2).updateFailedAttempt();
+        indexFileManager.getRecords().get(3).setDone();
+        indexFileManager.getRecords().get(1).setDoneCompleted(333l);
+        indexFileManager.getRecords().get(0).setCurrentLine(999);
+
+        Assert.assertEquals(indexFileManager.getRecords().size(), 6);
+    }
+
+    private void checkArchiveDir(File archiveDir) {
+        Set<String> availableFiles = new HashSet<>();
+        availableFiles.add(new File(archiveDir, "3.log").toString());
+        availableFiles.add(new File(archiveDir, "4.log").toString());
+
+        if (!archiveDir.exists()) {
+            return;
+        }
+
+        File[] files = archiveDir.listFiles();
+        Assert.assertNotNull(files);
+        Assert.assertEquals(files.length, 1);
+    }
+
+    private void addFile(IndexManagement.IndexFileManager indexFileManager, String dir, String fileName) throws IOException {
+        File file = new File(dir, fileName);
+        file.createNewFile();
+        indexFileManager.add(file.toString());
+    }
+
+    private void checkDoneFile(File newIndexDoneFile, File archiveDir, int maxArchiveFiles, String expectedFilePath) throws IOException {
+        IndexManagement.IndexFileManager indexFileManager = new IndexManagement.IndexFileManager(SOURCE_TEST, newIndexDoneFile, newIndexDoneFile, null, maxArchiveFiles);
+
+        Assert.assertEquals(indexFileManager.getRecords().size(), 2);
+        Assert.assertTrue(indexFileManager.getRecords().get(1).getPath().endsWith(expectedFilePath));
+    }
+
+    @AfterClass
+    public void tearDown() {
+        FileUtils.deleteQuietly(new File(spoolDirTest));
+    }
+}
diff --git a/notification/src/test/resources/spool/archive/spool-1.json b/notification/src/test/resources/spool/archive/spool-1.json
new file mode 100644
index 0000000..bcbcecf
--- /dev/null
+++ b/notification/src/test/resources/spool/archive/spool-1.json
@@ -0,0 +1,3 @@
+message-0
+message-1
+message-2
\ No newline at end of file
diff --git a/notification/src/test/resources/spool/index-test-src-1.json b/notification/src/test/resources/spool/index-test-src-1.json
new file mode 100644
index 0000000..d4cee87
--- /dev/null
+++ b/notification/src/test/resources/spool/index-test-src-1.json
@@ -0,0 +1,2 @@
+{"id":"1","path":"0.log","line":0,"created":123,"writeCompleted":888,"doneCompleted":0,"lastSuccess":0,"failedAttempt":0,"status":"PENDING"}
+{"id":"2","path":"1.log","line":10,"created":456,"writeCompleted":0,"doneCompleted":0,"lastSuccess":0,"failedAttempt":0,"status":"WRITE_IN_PROGRESS"}
diff --git a/notification/src/test/resources/spool/index-test-src-1_closed.json b/notification/src/test/resources/spool/index-test-src-1_closed.json
new file mode 100644
index 0000000..1b8881e
--- /dev/null
+++ b/notification/src/test/resources/spool/index-test-src-1_closed.json
@@ -0,0 +1 @@
+{"id":"x","path":"x.log","line":10,"created":456,"writeCompleted":0,"doneCompleted":0,"lastSuccess":0,"failedAttempt":0,"status":"DONE"}
diff --git a/pom.xml b/pom.xml
index b924201..91fd593 100644
--- a/pom.xml
+++ b/pom.xml
@@ -697,6 +697,7 @@
         <javax.servlet.version>3.1.0</javax.servlet.version>
         <guava.version>25.1-jre</guava.version>
         <antlr4.version>4.7</antlr4.version>
+        <log4j.version>2.8</log4j.version>
 
         <!-- Needed for hooks -->
         <aopalliance.version>1.0</aopalliance.version>