You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2018/05/21 03:35:23 UTC

[3/5] asterixdb git commit: [NO ISSUE][STO] Misc Storage Fixes and Improvements

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 681669f..52c8962 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -19,7 +19,10 @@
 
 package org.apache.asterix.common.context;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -37,16 +40,19 @@ import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.api.IoOperationCompleteListener;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 
-public class PrimaryIndexOperationTracker extends BaseOperationTracker {
+public class PrimaryIndexOperationTracker extends BaseOperationTracker implements IoOperationCompleteListener {
     private final int partition;
     // Number of active operations on an ILSMIndex instance.
     private final AtomicInteger numActiveOperations;
@@ -54,6 +60,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
     private final ILSMComponentIdGenerator idGenerator;
     private boolean flushOnExit = false;
     private boolean flushLogCreated = false;
+    private final Map<String, FlushOperation> scheduledFlushes = new HashMap<>();
 
     public PrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo,
             ILSMComponentIdGenerator idGenerator) {
@@ -107,6 +114,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
         }
 
         if (needsFlush || flushOnExit) {
+            flushOnExit = false;
             // make the current mutable components READABLE_UNWRITABLE to stop coming modify operations from entering
             // them until the current flush is scheduled.
             LSMComponentId primaryId = null;
@@ -143,7 +151,6 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
                 throw new IllegalStateException("Primary index not found in dataset " + dsInfo.getDatasetID());
             }
             LogRecord logRecord = new LogRecord();
-            flushOnExit = false;
             if (dsInfo.isDurable()) {
                 /*
                  * Generate a FLUSH log.
@@ -182,16 +189,36 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
             Map<String, Object> flushMap = new HashMap<>();
             flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
             flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
-            for (ILSMIndex lsmIndex : dsInfo.getDatasetPartitionOpenIndexes(partition)) {
-                ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-                accessor.getOpContext().setParameters(flushMap);
-                accessor.scheduleFlush();
+            synchronized (scheduledFlushes) {
+                for (ILSMIndex lsmIndex : dsInfo.getDatasetPartitionOpenIndexes(partition)) {
+                    ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+                    accessor.getOpContext().setParameters(flushMap);
+                    ILSMIOOperation flush = accessor.scheduleFlush();
+                    scheduledFlushes.put(flush.getTarget().getRelativePath(), (FlushOperation) flush);
+                    flush.addCompleteListener(this);
+                }
             }
         } finally {
             flushLogCreated = false;
         }
     }
 
+    @Override
+    public void completed(ILSMIOOperation operation) {
+        synchronized (scheduledFlushes) {
+            scheduledFlushes.remove(operation.getTarget().getRelativePath());
+        }
+    }
+
+    public List<FlushOperation> getScheduledFlushes() {
+        synchronized (scheduledFlushes) {
+            Collection<FlushOperation> scheduled = scheduledFlushes.values();
+            List<FlushOperation> flushes = new ArrayList<FlushOperation>(scheduled.size());
+            flushes.addAll(scheduled);
+            return flushes;
+        }
+    }
+
     public int getNumActiveOperations() {
         return numActiveOperations.get();
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
index 99ab2d0..71d16f7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
@@ -18,9 +18,12 @@
  */
 package org.apache.asterix.common.dataflow;
 
+import java.util.List;
+
 import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 
 public class LSMIndexUtil {
@@ -41,4 +44,15 @@ public class LSMIndexUtil {
             }
         }
     }
+
+    public static void waitFor(List<? extends ILSMIOOperation> ioOperations) throws HyracksDataException {
+        for (int i = 0; i < ioOperations.size(); i++) {
+            try {
+                ioOperations.get(i).sync();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw HyracksDataException.create(e);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 50f5906..ea53d68 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -20,19 +20,21 @@
 package org.apache.asterix.common.ioopcallbacks;
 
 import java.util.ArrayDeque;
+import java.util.Collection;
 import java.util.Deque;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 import org.apache.asterix.common.context.DatasetInfo;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.data.std.primitive.LongPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
 import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
@@ -45,13 +47,19 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
 import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
 import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 // A single LSMIOOperationCallback per LSM index used to perform actions around Flush and Merge operations
 public class LSMIOOperationCallback implements ILSMIOOperationCallback {
+    private static final Logger LOGGER = LogManager.getLogger();
     public static final String KEY_FLUSH_LOG_LSN = "FlushLogLsn";
     public static final String KEY_NEXT_COMPONENT_ID = "NextComponentId";
+    public static final String KEY_FLUSHED_COMPONENT_ID = "FlushedComponentId";
     private static final String KEY_FIRST_LSN = "FirstLsn";
     private static final MutableArrayValueReference KEY_METADATA_FLUSH_LOG_LSN =
             new MutableArrayValueReference(KEY_FLUSH_LOG_LSN.getBytes());
@@ -83,8 +91,11 @@ public class LSMIOOperationCallback implements ILSMIOOperationCallback {
         if (operation.getStatus() == LSMIOOperationStatus.FAILURE) {
             return;
         }
-        if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
-            Map<String, Object> map = operation.getAccessor().getOpContext().getParameters();
+        if (operation.getIOOpertionType() == LSMIOOperationType.LOAD) {
+            Map<String, Object> map = operation.getParameters();
+            putComponentIdIntoMetadata(operation.getNewComponent(), (LSMComponentId) map.get(KEY_FLUSHED_COMPONENT_ID));
+        } else if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
+            Map<String, Object> map = operation.getParameters();
             putLSNIntoMetadata(operation.getNewComponent(), (Long) map.get(KEY_FLUSH_LOG_LSN));
             putComponentIdIntoMetadata(operation.getNewComponent(),
                     ((FlushOperation) operation).getFlushingComponent().getId());
@@ -104,16 +115,64 @@ public class LSMIOOperationCallback implements ILSMIOOperationCallback {
         if (operation.getStatus() == LSMIOOperationStatus.FAILURE) {
             return;
         }
-        if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
-            Map<String, Object> map = operation.getAccessor().getOpContext().getParameters();
-            final Long lsn = (Long) map.get(KEY_FLUSH_LOG_LSN);
-            final Optional<String> componentFile =
-                    operation.getNewComponent().getLSMComponentPhysicalFiles().stream().findAny();
-            if (componentFile.isPresent()) {
-                final ResourceReference ref = ResourceReference.of(componentFile.get());
-                final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(ref.getName());
-                indexCheckpointManagerProvider.get(ref).flushed(componentEndTime, lsn);
+        if (operation.getIOOpertionType() != LSMIOOperationType.LOAD
+                && operation.getAccessor().getOpContext().getOperation() == IndexOperation.DELETE_COMPONENTS) {
+            deleteComponentsFromCheckpoint(operation);
+        } else if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH
+                || operation.getIOOpertionType() == LSMIOOperationType.LOAD) {
+            addComponentToCheckpoint(operation);
+        }
+    }
+
+    private void addComponentToCheckpoint(ILSMIOOperation operation) throws HyracksDataException {
+        // will always update the checkpoint file even if no new component was created
+        FileReference target = operation.getTarget();
+        Map<String, Object> map = operation.getParameters();
+        final Long lsn =
+                operation.getIOOpertionType() == LSMIOOperationType.FLUSH ? (Long) map.get(KEY_FLUSH_LOG_LSN) : 0L;
+        final LSMComponentId id = (LSMComponentId) map.get(KEY_FLUSHED_COMPONENT_ID);
+        final ResourceReference ref = ResourceReference.of(target.getAbsolutePath());
+        final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(ref.getName());
+        indexCheckpointManagerProvider.get(ref).flushed(componentEndTime, lsn, id.getMaxId());
+    }
+
+    private void deleteComponentsFromCheckpoint(ILSMIOOperation operation) throws HyracksDataException {
+        // component was deleted... if a flush, do nothing.. if a merge, must update the checkpoint file
+        if (operation.getIOOpertionType() == LSMIOOperationType.MERGE) {
+            // Get component id of the last disk component
+            LSMComponentId mostRecentComponentId =
+                    getMostRecentComponentId(operation.getAccessor().getOpContext().getComponentsToBeMerged());
+            // Update the checkpoint file
+            FileReference target = operation.getTarget();
+            final ResourceReference ref = ResourceReference.of(target.getAbsolutePath());
+            indexCheckpointManagerProvider.get(ref).setLastComponentId(mostRecentComponentId.getMaxId());
+        } else if (operation.getIOOpertionType() != LSMIOOperationType.FLUSH) {
+            throw new IllegalStateException("Unexpected IO operation: " + operation.getIOOpertionType());
+        }
+    }
+
+    private LSMComponentId getMostRecentComponentId(Collection<ILSMDiskComponent> deletedComponents)
+            throws HyracksDataException {
+        // must sync on opTracker to ensure list of components doesn't change
+        synchronized (lsmIndex.getOperationTracker()) {
+            List<ILSMDiskComponent> diskComponents = lsmIndex.getDiskComponents();
+            if (diskComponents.isEmpty()) {
+                LOGGER.log(Level.INFO, "There are no disk components");
+                return LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID;
+            }
+            if (deletedComponents.contains(diskComponents.get(diskComponents.size() - 1))) {
+                LOGGER.log(Level.INFO, "All disk components have been deleted");
+                return LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID;
             }
+            int mostRecentComponentIndex = 0;
+            for (int i = 0; i < diskComponents.size(); i++) {
+                if (!deletedComponents.contains(diskComponents.get(i))) {
+                    break;
+                }
+                mostRecentComponentIndex++;
+            }
+            ILSMDiskComponent mostRecentDiskComponent = diskComponents.get(mostRecentComponentIndex);
+            return (LSMComponentId) mostRecentDiskComponent.getId();
         }
     }
 
@@ -153,14 +212,6 @@ public class LSMIOOperationCallback implements ILSMIOOperationCallback {
         LSMComponentIdUtils.persist(componentId, newComponent.getMetadata());
     }
 
-    /**
-     * Used during the recovery process to force refresh the next component id
-     */
-    public void forceRefreshNextId(ILSMComponentId nextComponentId) {
-        componentIds.clear();
-        componentIds.add(nextComponentId);
-    }
-
     public synchronized void setFirstLsnForCurrentMemoryComponent(long firstLsn) {
         this.firstLsnForCurrentMemoryComponent = firstLsn;
         if (pendingFlushes == 0) {
@@ -195,9 +246,11 @@ public class LSMIOOperationCallback implements ILSMIOOperationCallback {
         dsInfo.declareActiveIOOperation();
         if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
             pendingFlushes++;
+            FlushOperation flush = (FlushOperation) operation;
             Map<String, Object> map = operation.getAccessor().getOpContext().getParameters();
             Long flushLsn = (Long) map.get(KEY_FLUSH_LOG_LSN);
             map.put(KEY_FIRST_LSN, firstLsnForCurrentMemoryComponent);
+            map.put(KEY_FLUSHED_COMPONENT_ID, flush.getFlushingComponent().getId());
             componentIds.add((ILSMComponentId) map.get(KEY_NEXT_COMPONENT_ID));
             firstLsnForCurrentMemoryComponent = flushLsn; // Advance the first lsn for new component
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
index b008f11..2c0872c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
@@ -27,31 +27,33 @@ public interface IIndexCheckpointManager {
     /**
      * Initializes the first checkpoint of an index with low watermark {@code lsn}
      *
+     * @param componentTimestamp
      * @param lsn
      * @throws HyracksDataException
      */
-    void init(long lsn) throws HyracksDataException;
+    void init(String componentTimestamp, long lsn) throws HyracksDataException;
 
     /**
-     * Called when a new LSM disk component is flushed. When called,  the index checkpoiint is updated
+     * Called when a new LSM disk component is flushed. When called, the index checkpoint is updated
      * with the latest valid {@code componentTimestamp} and low watermark {@code lsn}
      *
      * @param componentTimestamp
      * @param lsn
      * @throws HyracksDataException
      */
-    void flushed(String componentTimestamp, long lsn) throws HyracksDataException;
+    void flushed(String componentTimestamp, long lsn, long componentId) throws HyracksDataException;
 
     /**
-     * Called when a new LSM disk component is replicated from master. When called,  the index checkpoiint is updated
+     * Called when a new LSM disk component is replicated from master. When called, the index checkpoint is updated
      * with the latest valid {@code componentTimestamp} and the local lsn mapping of {@code masterLsn} is set as the
      * new low watermark.
      *
      * @param componentTimestamp
      * @param masterLsn
+     * @param componentId
      * @throws HyracksDataException
      */
-    void replicated(String componentTimestamp, long masterLsn) throws HyracksDataException;
+    void replicated(String componentTimestamp, long masterLsn, long componentId) throws HyracksDataException;
 
     /**
      * Called when a flush log is received and replicated from master. The mapping between
@@ -89,13 +91,37 @@ public interface IIndexCheckpointManager {
      * Gets the index last valid component timestamp if the index has any components. Otherwise {@link Optional#empty()}
      *
      * @return the index last valid component timestamp
+     * @throws HyracksDataException
      */
-    Optional<String> getValidComponentTimestamp();
+    Optional<String> getValidComponentTimestamp() throws HyracksDataException;
 
     /**
      * Gets the number of valid checkpoints the index has.
      *
      * @return the number of valid checkpoints
+     * @throws HyracksDataException
+     */
+    int getCheckpointCount() throws HyracksDataException;
+
+    /**
+     * @return the latest checkpoint
+     * @throws HyracksDataException
+     */
+    IndexCheckpoint getLatest() throws HyracksDataException;
+
+    /**
+     * Advance the last valid component timestamp. Used for replicated bulkloaded components
+     *
+     * @param timeStamp
+     * @throws HyracksDataException
+     */
+    void advanceValidComponentTimestamp(String timeStamp) throws HyracksDataException;
+
+    /**
+     * Set the last component id. Used during recovery or after component delete
+     *
+     * @param componentId
+     * @throws HyracksDataException
      */
-    int getCheckpointCount();
+    void setLastComponentId(long componentId) throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
index 6e845e1..73d3122 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -35,24 +36,28 @@ public class IndexCheckpoint {
     private long id;
     private String validComponentTimestamp;
     private long lowWatermark;
+    private long lastComponentId;
     private Map<Long, Long> masterNodeFlushMap;
 
-    public static IndexCheckpoint first(long lowWatermark) {
+    public static IndexCheckpoint first(String lastComponentTimestamp, long lowWatermark) {
         IndexCheckpoint firstCheckpoint = new IndexCheckpoint();
         firstCheckpoint.id = INITIAL_CHECKPOINT_ID;
         firstCheckpoint.lowWatermark = lowWatermark;
-        firstCheckpoint.validComponentTimestamp = null;
+        firstCheckpoint.validComponentTimestamp = lastComponentTimestamp;
+        firstCheckpoint.lastComponentId = LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId();
         firstCheckpoint.masterNodeFlushMap = new HashMap<>();
         return firstCheckpoint;
     }
 
-    public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, String validComponentTimestamp) {
+    public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, String validComponentTimestamp,
+            long lastComponentId) {
         if (lowWatermark < latest.getLowWatermark()) {
             throw new IllegalStateException("Low watermark should always be increasing");
         }
         IndexCheckpoint next = new IndexCheckpoint();
         next.id = latest.getId() + 1;
         next.lowWatermark = lowWatermark;
+        next.lastComponentId = lastComponentId;
         next.validComponentTimestamp = validComponentTimestamp;
         next.masterNodeFlushMap = latest.getMasterNodeFlushMap();
         // remove any lsn from the map that wont be used anymore
@@ -72,6 +77,10 @@ public class IndexCheckpoint {
         return lowWatermark;
     }
 
+    public long getLastComponentId() {
+        return lastComponentId;
+    }
+
     public Map<Long, Long> getMasterNodeFlushMap() {
         return masterNodeFlushMap;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManagerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManagerFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManagerFactory.java
new file mode 100644
index 0000000..1da5c9c
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManagerFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import org.apache.hyracks.api.application.INCServiceContext;
+
+public interface IRecoveryManagerFactory {
+
+    /**
+     * Create the local recovery manager
+     *
+     * @param serviceCtx
+     *            the service context
+     * @param txnSubsystem
+     *            the transaction subsystem
+     * @return the recovery manager
+     */
+    IRecoveryManager createRecoveryManager(INCServiceContext serviceCtx, ITransactionSubsystem txnSubsystem);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index 5fdb4e2..0c3b21d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.zip.CRC32;
 
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
-import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.tuples.SimpleTupleReference;
 import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
@@ -271,6 +270,7 @@ public class LogRecord implements ILogRecord {
                 computeAndSetLogSize();
                 break;
             case LogType.WAIT:
+            case LogType.WAIT_FOR_FLUSHES:
                 computeAndSetLogSize();
                 break;
             case LogType.JOB_COMMIT:
@@ -462,6 +462,7 @@ public class LogRecord implements ILogRecord {
                 logSize = FLUSH_LOG_SIZE;
                 break;
             case LogType.WAIT:
+            case LogType.WAIT_FOR_FLUSHES:
                 logSize = WAIT_LOG_SIZE;
                 break;
             case LogType.FILTER:

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
index f02b0de..2d76a11 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
@@ -28,6 +28,7 @@ public class LogType {
     public static final byte WAIT = 6;
     public static final byte FILTER = 7;
     public static final byte MARKER = 8;
+    public static final byte WAIT_FOR_FLUSHES = 9;
 
     private static final String STRING_UPDATE = "UPDATE";
     private static final String STRING_JOB_COMMIT = "JOB_COMMIT";
@@ -37,6 +38,7 @@ public class LogType {
     private static final String STRING_WAIT = "WAIT";
     private static final String STRING_FILTER = "FILTER";
     private static final String STRING_MARKER = "MARKER";
+    private static final String STRING_WAIT_FOR_FLUSHES = "WAIT_FOR_FLUSHES";
     private static final String STRING_UNKNOWN_LOG_TYPE = "UNKNOWN_LOG_TYPE";
 
     public static String toString(byte logType) {
@@ -53,6 +55,8 @@ public class LogType {
                 return STRING_FLUSH;
             case LogType.WAIT:
                 return STRING_WAIT;
+            case LogType.WAIT_FOR_FLUSHES:
+                return STRING_WAIT_FOR_FLUSHES;
             case LogType.FILTER:
                 return STRING_FILTER;
             case LogType.MARKER:

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 6b13468..aa2c7af 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -19,17 +19,17 @@
 package org.apache.asterix.common.utils;
 
 import java.io.File;
+import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.function.Function;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.storage.IndexPathElements;
 import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.io.MappedFileSplit;
 import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
@@ -143,4 +143,16 @@ public class StoragePathUtil {
     public static String getIndexNameFromPath(String path) {
         return Paths.get(path).getFileName().toString();
     }
+
+    /**
+     * Get the path of the index containing the passed reference
+     *
+     * @param ioManager
+     * @param ref
+     * @return
+     * @throws HyracksDataException
+     */
+    public static Path getIndexPath(IIOManager ioManager, ResourceReference ref) throws HyracksDataException {
+        return ioManager.resolve(ref.getRelativePath().toString()).getFile().toPath();
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
index 7af7b6e..29a2aa0 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
@@ -19,6 +19,9 @@
 
 package org.apache.asterix.test.ioopcallbacks;
 
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -35,6 +38,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
 import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
@@ -60,6 +64,15 @@ public class LSMIOOperationCallbackTest extends TestCase {
      * 7. destroy
      */
 
+    private static final Format FORMATTER =
+            new SimpleDateFormat(AbstractLSMIndexFileManager.COMPONENT_TIMESTAMP_FORMAT);
+
+    private static String getComponentFileName() {
+        Date date = new Date();
+        String ts = FORMATTER.format(date);
+        return ts + '_' + ts;
+    }
+
     @Test
     public void testNormalSequence() throws HyracksDataException {
         int numMemoryComponents = 2;
@@ -81,7 +94,7 @@ public class LSMIOOperationCallbackTest extends TestCase {
         flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
         ILSMIndexAccessor firstAccessor = new TestLSMIndexAccessor(new TestLSMIndexOperationContext(mockIndex));
         firstAccessor.getOpContext().setParameters(flushMap);
-        FileReference firstTarget = new FileReference(Mockito.mock(IODeviceHandle.class), "path");
+        FileReference firstTarget = new FileReference(Mockito.mock(IODeviceHandle.class), getComponentFileName());
         LSMComponentFileReferences firstFiles = new LSMComponentFileReferences(firstTarget, firstTarget, firstTarget);
         FlushOperation firstFlush = new TestFlushOperation(firstAccessor, firstTarget, callback, indexId, firstFiles,
                 new LSMComponentId(0, 0));
@@ -97,7 +110,7 @@ public class LSMIOOperationCallbackTest extends TestCase {
         flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
         ILSMIndexAccessor secondAccessor = new TestLSMIndexAccessor(new TestLSMIndexOperationContext(mockIndex));
         secondAccessor.getOpContext().setParameters(flushMap);
-        FileReference secondTarget = new FileReference(Mockito.mock(IODeviceHandle.class), "path");
+        FileReference secondTarget = new FileReference(Mockito.mock(IODeviceHandle.class), getComponentFileName());
         LSMComponentFileReferences secondFiles =
                 new LSMComponentFileReferences(secondTarget, secondTarget, secondTarget);
         FlushOperation secondFlush = new TestFlushOperation(secondAccessor, secondTarget, callback, indexId,
@@ -175,7 +188,7 @@ public class LSMIOOperationCallbackTest extends TestCase {
             flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, expectedId);
             ILSMIndexAccessor accessor = new TestLSMIndexAccessor(new TestLSMIndexOperationContext(mockIndex));
             accessor.getOpContext().setParameters(flushMap);
-            FileReference target = new FileReference(Mockito.mock(IODeviceHandle.class), "path");
+            FileReference target = new FileReference(Mockito.mock(IODeviceHandle.class), getComponentFileName());
             LSMComponentFileReferences files = new LSMComponentFileReferences(target, target, target);
             FlushOperation flush =
                     new TestFlushOperation(accessor, target, callback, indexId, files, new LSMComponentId(0, 0));
@@ -210,7 +223,7 @@ public class LSMIOOperationCallbackTest extends TestCase {
         IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
                 Mockito.mock(IIndexCheckpointManagerProvider.class);
         IIndexCheckpointManager indexCheckpointManager = Mockito.mock(IIndexCheckpointManager.class);
-        Mockito.doNothing().when(indexCheckpointManager).flushed(Mockito.any(), Mockito.anyLong());
+        Mockito.doNothing().when(indexCheckpointManager).flushed(Mockito.any(), Mockito.anyLong(), Mockito.anyLong());
         Mockito.doReturn(indexCheckpointManager).when(indexCheckpointManagerProvider).get(Mockito.any());
         return indexCheckpointManagerProvider;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexAccessor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexAccessor.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexAccessor.java
index 9ac143c..c2621d8 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexAccessor.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexAccessor.java
@@ -141,7 +141,7 @@ public class TestLSMIndexAccessor implements ILSMIndexAccessor {
     }
 
     @Override
-    public void scheduleReplication(List<ILSMDiskComponent> diskComponents, boolean bulkload, LSMOperationType opType)
+    public void scheduleReplication(List<ILSMDiskComponent> diskComponents, LSMOperationType opType)
             throws HyracksDataException {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java
index 79dc396..df4c093 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java
@@ -18,8 +18,11 @@
  */
 package org.apache.asterix.external.operators;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.FileIndexTupleTranslator;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -36,7 +39,8 @@ import org.apache.hyracks.storage.am.common.api.IIndexBuilder;
 import org.apache.hyracks.storage.am.common.api.IIndexBuilderFactory;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
 
 /**
@@ -74,10 +78,13 @@ public class ExternalFilesIndexCreateOperatorDescriptor extends AbstractSingleAc
                 // Open the index
                 indexHelper.open();
                 try {
-                    IIndex index = indexHelper.getIndexInstance();
+                    ILSMIndex index = (ILSMIndex) indexHelper.getIndexInstance();
+                    Map<String, Object> parameters = new HashMap<>();
+                    parameters.put(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID,
+                            LSMComponentId.DEFAULT_COMPONENT_ID);
                     // Create bulk loader
                     IIndexBulkLoader bulkLoader =
-                            index.createBulkLoader(BTree.DEFAULT_FILL_FACTOR, false, files.size(), false);
+                            index.createBulkLoader(BTree.DEFAULT_FILL_FACTOR, false, files.size(), false, parameters);
                     // Load files
                     for (ExternalFile file : files) {
                         bulkLoader.add(filesTupleTranslator.getTupleFromFile(file));

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java
index 4bc2867..3bada4a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java
@@ -18,7 +18,9 @@
  */
 package org.apache.asterix.external.operators;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.indexing.ExternalFile;
@@ -69,9 +71,10 @@ public class ExternalFilesIndexModificationOperatorDescriptor extends AbstractSi
                 indexHelper.open();
                 IIndex index = indexHelper.getIndexInstance();
                 LSMTwoPCBTreeBulkLoader bulkLoader = null;
+                Map<String, Object> parameters = new HashMap<>();
                 try {
                     bulkLoader = (LSMTwoPCBTreeBulkLoader) ((ExternalBTree) index)
-                            .createTransactionBulkLoader(BTree.DEFAULT_FILL_FACTOR, false, files.size());
+                            .createTransactionBulkLoader(BTree.DEFAULT_FILL_FACTOR, false, files.size(), parameters);
                     // Load files
                     // The files must be ordered according to their numbers
                     for (ExternalFile file : files) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java
index 573de5d..74bc0dc 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java
@@ -18,12 +18,18 @@
  */
 package org.apache.asterix.external.operators;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IndexBulkLoadOperatorNodePushable;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 
 public class ExternalIndexBulkLoadOperatorNodePushable extends IndexBulkLoadOperatorNodePushable {
 
@@ -43,4 +49,12 @@ public class ExternalIndexBulkLoadOperatorNodePushable extends IndexBulkLoadOper
         super.open();
         ((ITwoPCIndex) index).setCurrentVersion(version);
     }
+
+    @Override
+    protected void initializeBulkLoader() throws HyracksDataException {
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID, LSMComponentId.DEFAULT_COMPONENT_ID);
+        bulkLoader = ((ILSMIndex) index).createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
+                parameters);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
index aaca3f1..57e2917 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
@@ -19,6 +19,8 @@
 package org.apache.asterix.external.operators;
 
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.asterix.external.indexing.FilesIndexDescription;
 import org.apache.asterix.om.base.AMutableInt32;
@@ -60,8 +62,9 @@ public class ExternalIndexBulkModifyOperatorNodePushable extends IndexBulkLoadOp
         try {
             writer.open();
             // Transactional BulkLoader
-            bulkLoader =
-                    ((ITwoPCIndex) index).createTransactionBulkLoader(fillFactor, verifyInput, deletedFiles.length);
+            Map<String, Object> parameters = new HashMap<>();
+            bulkLoader = ((ITwoPCIndex) index).createTransactionBulkLoader(fillFactor, verifyInput, deletedFiles.length,
+                    parameters);
             // Delete files
             for (int i = 0; i < deletedFiles.length; i++) {
                 fileNumber.setValue(deletedFiles[i]);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index 84922cd..d4d601c 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -22,16 +22,20 @@ import java.io.DataInput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.file.Path;
 import java.util.Collection;
 
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.replication.api.IReplicationWorker;
 import org.apache.asterix.common.storage.DatasetResourceReference;
 import org.apache.asterix.common.storage.IIndexCheckpointManager;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.api.IReplicationWorker;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 import org.apache.hyracks.storage.common.LocalResource;
 
 /**
@@ -51,13 +55,27 @@ public class CheckpointPartitionIndexesTask implements IReplicaTask {
                 appCtx.getIndexCheckpointManagerProvider();
         PersistentLocalResourceRepository resRepo =
                 (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
+        final IIOManager ioManager = appCtx.getIoManager();
         final Collection<LocalResource> partitionResources = resRepo.getPartitionResources(partition).values();
         final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
         for (LocalResource ls : partitionResources) {
-            final IIndexCheckpointManager indexCheckpointManager =
-                    indexCheckpointManagerProvider.get(DatasetResourceReference.of(ls));
+            DatasetResourceReference ref = DatasetResourceReference.of(ls);
+            final IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(ref);
             indexCheckpointManager.delete();
-            indexCheckpointManager.init(currentLSN);
+            // Get most recent timestamp of existing files to avoid deletion
+            Path indexPath = StoragePathUtil.getIndexPath(ioManager, ref);
+            String[] files = indexPath.toFile().list(AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
+            if (files == null) {
+                throw HyracksDataException
+                        .create(new IOException(indexPath + " is not a directory or an IO Error occurred"));
+            }
+            String mostRecentTimestamp = null;
+            for (String file : files) {
+                String nextTimeStamp = AbstractLSMIndexFileManager.getComponentEndTime(file);
+                mostRecentTimestamp = mostRecentTimestamp == null || nextTimeStamp.compareTo(mostRecentTimestamp) > 0
+                        ? nextTimeStamp : mostRecentTimestamp;
+            }
+            indexCheckpointManager.init(mostRecentTimestamp, currentLSN);
         }
         ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
index 57474ef..a4f9b43 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
@@ -34,6 +34,7 @@ import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.replication.api.IReplicaTask;
 import org.apache.asterix.replication.api.IReplicationWorker;
+import org.apache.asterix.replication.sync.IndexSynchronizer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 
@@ -43,17 +44,21 @@ import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManage
 public class MarkComponentValidTask implements IReplicaTask {
 
     private final long masterLsn;
+    private final long lastComponentId;
     private final String file;
 
-    public MarkComponentValidTask(String file, long masterLsn) {
+    public MarkComponentValidTask(String file, long masterLsn, long lastComponentId) {
         this.file = file;
+        this.lastComponentId = lastComponentId;
         this.masterLsn = masterLsn;
     }
 
     @Override
     public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
         try {
-            if (masterLsn > 0) {
+            if (masterLsn == IndexSynchronizer.BULKLOAD_LSN) {
+                updateBulkLoadedLastComponentTimestamp(appCtx);
+            } else if (masterLsn != IndexSynchronizer.MERGE_LSN) {
                 ensureComponentLsnFlushed(appCtx);
             }
             // delete mask
@@ -65,6 +70,15 @@ public class MarkComponentValidTask implements IReplicaTask {
         }
     }
 
+    private void updateBulkLoadedLastComponentTimestamp(INcApplicationContext appCtx) throws HyracksDataException {
+        final ResourceReference indexRef = ResourceReference.of(file);
+        final IIndexCheckpointManagerProvider checkpointManagerProvider = appCtx.getIndexCheckpointManagerProvider();
+        final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef);
+        final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(indexRef.getName());
+        indexCheckpointManager.advanceValidComponentTimestamp(componentEndTime);
+
+    }
+
     private void ensureComponentLsnFlushed(INcApplicationContext appCtx)
             throws HyracksDataException, InterruptedException {
         final ResourceReference indexRef = ResourceReference.of(file);
@@ -82,7 +96,7 @@ public class MarkComponentValidTask implements IReplicaTask {
                 replicationTimeOut -= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
             }
             final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(indexRef.getName());
-            indexCheckpointManager.replicated(componentEndTime, masterLsn);
+            indexCheckpointManager.replicated(componentEndTime, masterLsn, lastComponentId);
         }
     }
 
@@ -97,6 +111,7 @@ public class MarkComponentValidTask implements IReplicaTask {
             final DataOutputStream dos = new DataOutputStream(out);
             dos.writeUTF(file);
             dos.writeLong(masterLsn);
+            dos.writeLong(lastComponentId);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
@@ -105,6 +120,7 @@ public class MarkComponentValidTask implements IReplicaTask {
     public static MarkComponentValidTask create(DataInput input) throws IOException {
         final String indexFile = input.readUTF();
         final long lsn = input.readLong();
-        return new MarkComponentValidTask(indexFile, lsn);
+        final long lastComponentId = input.readLong();
+        return new MarkComponentValidTask(indexFile, lsn, lastComponentId);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
index ca0fcca..20663d1 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -30,12 +30,12 @@ import java.nio.file.Paths;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
-import org.apache.asterix.replication.api.IReplicationWorker;
 import org.apache.asterix.common.storage.IIndexCheckpointManager;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.api.IReplicationWorker;
 import org.apache.asterix.replication.management.NetworkingUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
@@ -98,7 +98,7 @@ public class ReplicateFileTask implements IReplicaTask {
         final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef);
         final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
         indexCheckpointManager.delete();
-        indexCheckpointManager.init(currentLSN);
+        indexCheckpointManager.init(null, currentLSN);
         LOGGER.info(() -> "Checkpoint index: " + indexRef);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
index 0e07ac7..30a5595 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
@@ -37,12 +37,15 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 public class IndexSynchronizer {
 
     private static final Logger LOGGER = LogManager.getLogger();
+    public static final long MERGE_LSN = -1;
+    public static final long BULKLOAD_LSN = -2;
     private final IReplicationJob job;
     private final INcApplicationContext appCtx;
 
@@ -91,7 +94,8 @@ public class IndexSynchronizer {
         final FileSynchronizer fileSynchronizer = new FileSynchronizer(appCtx, replica);
         job.getJobFiles().stream().map(StoragePathUtil::getFileRelativePath).forEach(fileSynchronizer::replicate);
         // send mark component valid
-        MarkComponentValidTask markValidTask = new MarkComponentValidTask(indexFile, getReplicatedComponentLsn());
+        MarkComponentValidTask markValidTask =
+                new MarkComponentValidTask(indexFile, getReplicatedComponentLsn(), getReplicatedComponentId());
         ReplicationProtocol.sendTo(replica, markValidTask);
         ReplicationProtocol.waitForAck(replica);
         LOGGER.debug("Replicated component ({}) to replica {}", indexFile, replica);
@@ -118,6 +122,12 @@ public class IndexSynchronizer {
 
     private long getReplicatedComponentLsn() throws HyracksDataException {
         final ILSMIndexReplicationJob indexReplJob = (ILSMIndexReplicationJob) job;
+        if (indexReplJob.getLSMOpType() == LSMOperationType.MERGE) {
+            return MERGE_LSN;
+        } else if (indexReplJob.getLSMOpType() == LSMOperationType.LOAD) {
+            return BULKLOAD_LSN;
+        }
+
         if (indexReplJob.getLSMOpType() != LSMOperationType.FLUSH) {
             return LSMIOOperationCallback.INVALID_LSN;
         }
@@ -126,4 +136,14 @@ public class IndexSynchronizer {
         return ((LSMIOOperationCallback) lsmIndex.getIOOperationCallback())
                 .getComponentLSN(ctx.getComponentsToBeReplicated());
     }
+
+    private long getReplicatedComponentId() throws HyracksDataException {
+        final ILSMIndexReplicationJob indexReplJob = (ILSMIndexReplicationJob) job;
+        if (indexReplJob.getLSMOpType() != LSMOperationType.FLUSH) {
+            return -1L;
+        }
+        final ILSMIndexOperationContext ctx = indexReplJob.getLSMIndexOperationContext();
+        LSMComponentId id = (LSMComponentId) ctx.getComponentsToBeReplicated().get(0).getId();
+        return id.getMinId();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
index 2415556..4130490 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
@@ -18,10 +18,13 @@
  */
 package org.apache.asterix.runtime.operators;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
 import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -33,7 +36,6 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
-import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexDiskComponentBulkLoader;
 import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
 
 public class LSMIndexBulkLoadOperatorNodePushable extends IndexBulkLoadOperatorNodePushable {
@@ -71,29 +73,26 @@ public class LSMIndexBulkLoadOperatorNodePushable extends IndexBulkLoadOperatorN
     @Override
     protected void initializeBulkLoader() throws HyracksDataException {
         ILSMIndex targetIndex = (ILSMIndex) index;
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID, LSMComponentId.DEFAULT_COMPONENT_ID);
         if (usage.equals(BulkLoadUsage.LOAD)) {
-            // for a loaded dataset, we use the default Id 0 which is guaranteed to be smaller
-            // than Ids of all memory components
-
-            // TODO handle component Id for datasets loaded multiple times
-            // TODO move this piece of code to io operation callback
-            bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
-            ILSMDiskComponent diskComponent = ((LSMIndexDiskComponentBulkLoader) bulkLoader).getComponent();
-            LSMComponentIdUtils.persist(LSMComponentId.DEFAULT_COMPONENT_ID, diskComponent.getMetadata());
+            bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
+                    parameters);
         } else {
             primaryIndexHelper.open();
             primaryIndex = (ILSMIndex) primaryIndexHelper.getIndexInstance();
             List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents();
-            bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
             if (!primaryComponents.isEmpty()) {
-                // TODO move this piece of code to io operation callback
-                // Ideally, this should be done in io operation callback when a bulk load operation is finished
-                // However, currently we don't have an extensible callback mechanism to support this
                 ILSMComponentId bulkloadId = LSMComponentIdUtils.union(primaryComponents.get(0).getId(),
                         primaryComponents.get(primaryComponents.size() - 1).getId());
-                ILSMDiskComponent diskComponent = ((LSMIndexDiskComponentBulkLoader) bulkLoader).getComponent();
-                LSMComponentIdUtils.persist(bulkloadId, diskComponent.getMetadata());
+                parameters.put(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID, bulkloadId);
+            } else {
+                parameters.put(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID,
+                        LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID);
             }
+            bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
+                    parameters);
+
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
index 6d9ec47..1029d6f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
@@ -19,7 +19,10 @@
 package org.apache.asterix.runtime.operators;
 
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 
+import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
 import org.apache.asterix.runtime.operators.LSMSecondaryIndexCreationTupleProcessorNodePushable.DeletedTupleCounter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -32,9 +35,9 @@ import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexDiskComponentBulkLoader;
-import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
 
 /**
+ * Note: only used with correlated merge policy
  * This operator node is used to bulk load incoming tuples (scanned from the primary index)
  * into multiple disk components of the secondary index.
  * Incoming tuple format:
@@ -182,14 +185,12 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI
 
     private void loadNewComponent(int componentPos) throws HyracksDataException {
         endCurrentComponent();
-
         int numTuples = getNumDeletedTuples(componentPos);
         ILSMDiskComponent primaryComponent = primaryIndex.getDiskComponents().get(componentPos);
-        componentBulkLoader =
-                (LSMIndexDiskComponentBulkLoader) secondaryIndex.createBulkLoader(1.0f, false, numTuples, false);
-        ILSMDiskComponent diskComponent = componentBulkLoader.getComponent();
-        // TODO move this piece of code to io operation callback
-        LSMComponentIdUtils.persist(primaryComponent.getId(), diskComponent.getMetadata());
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID, primaryComponent.getId());
+        componentBulkLoader = (LSMIndexDiskComponentBulkLoader) secondaryIndex.createBulkLoader(1.0f, false, numTuples,
+                false, parameters);
     }
 
     private void addAntiMatterTuple(ITupleReference tuple) throws HyracksDataException {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
index d61e9a0..7e42d14 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.DatasetInfo;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ILockManager;
@@ -82,7 +83,13 @@ public class FlushDatasetOperatorDescriptor extends AbstractSingleActivityOperat
                     // lock the dataset granule
                     lockManager.lock(datasetId, -1, LockMode.S, txnCtx);
                     // flush the dataset synchronously
-                    datasetLifeCycleManager.flushDataset(datasetId.getId(), false);
+                    DatasetInfo datasetInfo = datasetLifeCycleManager.getDatasetInfo(datasetId.getId());
+                    // TODO: Remove the isOpen check and let it fail if flush is requested for a dataset that is closed
+                    synchronized (datasetLifeCycleManager) {
+                        if (datasetInfo.isOpen()) {
+                            datasetLifeCycleManager.flushDataset(datasetId.getId(), false);
+                        }
+                    }
                 } catch (ACIDException e) {
                     throw HyracksDataException.create(e);
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 0375c30..e1963cb 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -186,7 +186,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
         }
 
         resourceCache.put(resource.getPath(), resource);
-        indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(0);
+        indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(null, 0);
         //if replication enabled, send resource metadata info to remote nodes
         if (isReplicationEnabled) {
             createReplicationJob(ReplicationOperation.REPLICATE, resourceFile);
@@ -429,15 +429,20 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
     }
 
     private void deleteIndexInvalidComponents(File index) throws IOException, ParseException {
-        final Optional<String> validComponentTimestamp = getIndexCheckpointManager(index).getValidComponentTimestamp();
-        if (!validComponentTimestamp.isPresent()) {
-            // index doesn't have any components
-            return;
-        }
         final Format formatter = THREAD_LOCAL_FORMATTER.get();
-        final Date validTimestamp = (Date) formatter.parseObject(validComponentTimestamp.get());
         final File[] indexComponentFiles = index.listFiles(COMPONENT_FILES_FILTER);
-        if (indexComponentFiles != null) {
+        if (indexComponentFiles == null) {
+            throw new IOException(index + " doesn't exist or an IO error occurred");
+        }
+        final Optional<String> validComponentTimestamp = getIndexCheckpointManager(index).getValidComponentTimestamp();
+        if (!validComponentTimestamp.isPresent()) {
+            // index doesn't have any valid component, delete all
+            for (File componentFile : indexComponentFiles) {
+                LOGGER.info(() -> "Deleting invalid component file: " + componentFile.getAbsolutePath());
+                Files.delete(componentFile.toPath());
+            }
+        } else {
+            final Date validTimestamp = (Date) formatter.parseObject(validComponentTimestamp.get());
             for (File componentFile : indexComponentFiles) {
                 // delete any file with startTime > validTimestamp
                 final String fileStartTimeStr =
@@ -505,7 +510,8 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
      * e.g. a component file 2018-01-08-01-08-50-439_2018-01-08-01-08-50-439_b
      * will return a component id 2018-01-08-01-08-50-439_2018-01-08-01-08-50-439
      *
-     * @param componentFile any component file
+     * @param componentFile
+     *            any component file
      * @return The component id
      */
     public static String getComponentId(String componentFile) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 21268e5..4085fb4 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -89,8 +89,7 @@ public class LogBuffer implements ILogBuffer {
     public void append(ILogRecord logRecord, long appendLsn) {
         logRecord.writeLogRecord(appendBuffer);
 
-        if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH
-                && logRecord.getLogType() != LogType.WAIT) {
+        if (isLocalTransactionLog(logRecord)) {
             logRecord.getTxnCtx().setLastLSN(appendLsn);
         }
 
@@ -100,13 +99,10 @@ public class LogBuffer implements ILogBuffer {
                 LOGGER.info("append()| appendOffset: " + appendOffset);
             }
             if (logRecord.getLogSource() == LogSource.LOCAL) {
-                if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT
-                        || logRecord.getLogType() == LogType.WAIT) {
+                if (syncPendingNonFlushLog(logRecord)) {
                     logRecord.isFlushed(false);
                     syncCommitQ.add(logRecord);
-                }
-                if (logRecord.getLogType() == LogType.FLUSH) {
-                    logRecord.isFlushed(false);
+                } else if (logRecord.getLogType() == LogType.FLUSH) {
                     flushQ.add(logRecord);
                 }
             } else if (logRecord.getLogSource() == LogSource.REMOTE && (logRecord.getLogType() == LogType.JOB_COMMIT
@@ -117,6 +113,16 @@ public class LogBuffer implements ILogBuffer {
         }
     }
 
+    private boolean syncPendingNonFlushLog(ILogRecord logRecord) {
+        return logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT
+                || logRecord.getLogType() == LogType.WAIT || logRecord.getLogType() == LogType.WAIT_FOR_FLUSHES;
+    }
+
+    private boolean isLocalTransactionLog(ILogRecord logRecord) {
+        return logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH
+                && logRecord.getLogType() != LogType.WAIT && logRecord.getLogType() != LogType.WAIT_FOR_FLUSHES;
+    }
+
     @Override
     public void setFileChannel(FileChannel fileChannel) {
         this.fileChannel = fileChannel;
@@ -231,7 +237,8 @@ public class LogBuffer implements ILogBuffer {
                         notifyJobTermination();
                     } else if (logRecord.getLogType() == LogType.FLUSH) {
                         notifyFlushTermination();
-                    } else if (logRecord.getLogType() == LogType.WAIT) {
+                    } else if (logRecord.getLogType() == LogType.WAIT
+                            || logRecord.getLogType() == LogType.WAIT_FOR_FLUSHES) {
                         notifyWaitTermination();
                     }
                 } else if (logRecord.getLogSource() == LogSource.REMOTE && (logRecord.getLogType() == LogType.JOB_COMMIT

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 736de07..be227ec 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -32,7 +32,6 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -134,13 +133,33 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
 
     @Override
     public void log(ILogRecord logRecord) {
-        if (logRecord.getLogType() == LogType.FLUSH) {
+        if (!logToFlushQueue(logRecord)) {
+            appendToLogTail(logRecord);
+        }
+    }
+
+    @SuppressWarnings("squid:S2445")
+    protected boolean logToFlushQueue(ILogRecord logRecord) {
+        //Remote flush logs do not need to be flushed separately since they may not trigger local flush
+        if ((logRecord.getLogType() == LogType.FLUSH && logRecord.getLogSource() == LogSource.LOCAL)
+                || logRecord.getLogType() == LogType.WAIT_FOR_FLUSHES) {
+            logRecord.isFlushed(false);
             flushLogsQ.add(logRecord);
-            return;
+            if (logRecord.getLogType() == LogType.WAIT_FOR_FLUSHES) {
+                InvokeUtil.doUninterruptibly(() -> {
+                    synchronized (logRecord) {
+                        while (!logRecord.isFlushed()) {
+                            logRecord.wait();
+                        }
+                    }
+                });
+            }
+            return true;
         }
-        appendToLogTail(logRecord);
+        return false;
     }
 
+    @SuppressWarnings("squid:S2445")
     protected void appendToLogTail(ILogRecord logRecord) {
         syncAppendToLogTail(logRecord);
         if (waitForFlush(logRecord) && !logRecord.isFlushed()) {
@@ -161,7 +180,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
 
     synchronized void syncAppendToLogTail(ILogRecord logRecord) {
         if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH
-                && logRecord.getLogType() != LogType.WAIT) {
+                && logRecord.getLogType() != LogType.WAIT && logRecord.getLogType() != LogType.WAIT_FOR_FLUSHES) {
             ITransactionContext txnCtx = logRecord.getTxnCtx();
             if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) {
                 throw new ACIDException(

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index 1e13883..d2e9629 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -40,9 +40,11 @@ public class LogManagerWithReplication extends LogManager {
         super(txnSubsystem);
     }
 
+    @SuppressWarnings("squid:S2445")
     @Override
     public void log(ILogRecord logRecord) {
-        boolean shouldReplicate = logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.WAIT;
+        boolean shouldReplicate = logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.WAIT
+                && logRecord.getLogType() != LogType.WAIT_FOR_FLUSHES;
         if (shouldReplicate) {
             switch (logRecord.getLogType()) {
                 case LogType.ENTITY_COMMIT:
@@ -63,16 +65,12 @@ public class LogManagerWithReplication extends LogManager {
             }
         }
         logRecord.setReplicate(shouldReplicate);
-
-        //Remote flush logs do not need to be flushed separately since they may not trigger local flush
-        if (logRecord.getLogType() == LogType.FLUSH && logRecord.getLogSource() == LogSource.LOCAL) {
-            flushLogsQ.add(logRecord);
-            return;
+        if (!logToFlushQueue(logRecord)) {
+            appendToLogTail(logRecord);
         }
-
-        appendToLogTail(logRecord);
     }
 
+    @SuppressWarnings("squid:S2445")
     @Override
     protected void appendToLogTail(ILogRecord logRecord) {
         syncAppendToLogTail(logRecord);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITwoPCIndexBulkLoader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITwoPCIndexBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITwoPCIndexBulkLoader.java
index 16c0afa..c7be7eb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITwoPCIndexBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITwoPCIndexBulkLoader.java
@@ -39,6 +39,6 @@ public interface ITwoPCIndexBulkLoader {
     /**
      * Abort the bulk modify op
      */
-    public void abort();
+    public void abort() throws HyracksDataException;
 
 }