You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by lu...@apache.org on 2017/09/28 20:44:32 UTC

asterixdb git commit: [ASTERIXDB-2104][STO] Optimization for Correlated Policy

Repository: asterixdb
Updated Branches:
  refs/heads/master ebde95d63 -> 56d972fa6


[ASTERIXDB-2104][STO] Optimization for Correlated Policy

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Previously, we introduced an optimization to the prefix merge policy
by ensuring component size grows exponentially after merging. This patch
applies the same optimization to CorrelatedMergePolicy
- A little refactoring of PrefixMergePolicy and CorrelatedMergePolicy
for code reuse

Change-Id: Icd84c77f1e2c34c410508fbc9de70156224ce932
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2019
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <ba...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/56d972fa
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/56d972fa
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/56d972fa

Branch: refs/heads/master
Commit: 56d972fa6dd89769d6ba1220d49e2d54d48657d3
Parents: ebde95d
Author: luochen01 <cl...@uci.edu>
Authored: Thu Sep 21 16:26:58 2017 -0700
Committer: Luo Chen <cl...@uci.edu>
Committed: Thu Sep 28 13:42:27 2017 -0700

----------------------------------------------------------------------
 .../context/CorrelatedPrefixMergePolicy.java    | 221 +++----------------
 .../CorrelatedPrefixMergePolicyFactory.java     |  17 +-
 .../CorrelatedPrefixMergePolicyTest.java        |  68 +-----
 .../am/lsm/common/impls/PrefixMergePolicy.java  |  18 +-
 4 files changed, 41 insertions(+), 283 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/56d972fa/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index d28b991..5bb9d49 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -22,26 +22,20 @@ package org.apache.asterix.common.context;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicy;
 
-public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy {
-
-    private long maxMergableComponentSize;
-    private int maxToleranceComponentCount;
+public class CorrelatedPrefixMergePolicy extends PrefixMergePolicy {
 
     private final IDatasetLifecycleManager datasetLifecycleManager;
     private final int datasetId;
@@ -61,37 +55,9 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy {
         // 2.  If a merge from 1 doesn't happen, see if the set of candidate components for merging exceeds MaxTolCompCnt.  If so, schedule
         // a merge all of the current candidates into a new single component.
 
-        if (fullMergeIsRequested) {
-            //full merge request is handled by each index separately, since it is possible that
-            //when a primary index wants to send full merge requests for all secondaries,
-            //one secondary index is being merged and the request cannot be scheduled
-            List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
-            if (!areComponentsReadableUnwritableState(immutableComponents)) {
-                return;
-            }
-
-            ILSMIndexAccessor accessor =
-                    index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-            accessor.scheduleFullMerge(index.getIOOperationCallback());
-            return;
-        }
-
-        if (!index.isPrimaryIndex()) {
-            return;
-        }
-        List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
-        if (!areComponentsReadableUnwritableState(immutableComponents)) {
-            return;
+        if (fullMergeIsRequested || index.isPrimaryIndex()) {
+            super.diskComponentAdded(index, fullMergeIsRequested);
         }
-        scheduleMerge(index);
-    }
-
-    @Override
-    public void configure(Map<String, String> properties) {
-        maxMergableComponentSize =
-                Long.parseLong(properties.get(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_SIZE));
-        maxToleranceComponentCount =
-                Integer.parseInt(properties.get(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_COUNT));
     }
 
     /**
@@ -101,106 +67,33 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy {
      */
     @Override
     public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException {
-        /**
-         * case 1.
-         * if mergableImmutableCommponentCount < threshold,
-         * merge operation is not lagged ==> return false.
-         * case 2.
-         * if a) mergableImmutableCommponentCount >= threshold && b) there is an ongoing merge,
-         * merge operation is lagged. ==> return true.
-         * case 3. *SPECIAL CASE*
-         * if a) mergableImmutableCommponentCount >= threshold && b) there is *NO* ongoing merge,
-         * merge operation is lagged. ==> *schedule a merge operation* and then return true.
-         * This is a special case that requires to schedule a merge operation.
-         * Otherwise, all flush operations will be hung.
-         * This case can happen in a following situation:
-         * The system may crash when
-         * condition 1) the mergableImmutableCommponentCount >= threshold and
-         * condition 2) merge operation is going on.
-         * After the system is recovered, still condition 1) is true.
-         * If there are flush operations in the same dataset partition after the recovery,
-         * all these flush operations may not proceed since there is no ongoing merge and
-         * there will be no new merge either in this situation.
-         * Note for case 3, we only let the primary index to schedule merge operations on behalf
-         * of all indexes.
-         */
-
-        List<ILSMDiskComponent> immutableComponents = index.getImmutableComponents();
-        int mergableImmutableComponentCount = getMergableImmutableComponentCount(immutableComponents);
-
-        // [case 1]
-        if (mergableImmutableComponentCount < maxToleranceComponentCount) {
-            return false;
-        }
-
-        boolean isMergeOngoing = isMergeOngoing(immutableComponents);
-
-        if (isMergeOngoing) {
-            // [case 2]
-            return true;
-        }
-
         if (index.isPrimaryIndex()) {
-            // [case 3]
-            // make sure that all components are of READABLE_UNWRITABLE state.
-            if (!areComponentsReadableUnwritableState(immutableComponents)) {
-                throw new IllegalStateException();
-            }
-            // schedule a merge operation
-            boolean isMergeTriggered = scheduleMerge(index);
-            if (!isMergeTriggered) {
-                throw new IllegalStateException();
-            }
-            return true;
+            return super.isMergeLagging(index);
         } else {
-            //[case 3]
-            //if the index is secondary then ignore the merge request (since the merge should be
-            //triggered by the primary) and here we simply treat it as not lagged.
             return false;
         }
+
     }
 
-    private boolean scheduleMerge(ILSMIndex index) throws HyracksDataException {
+    @Override
+    protected boolean scheduleMerge(final ILSMIndex index) throws HyracksDataException {
         List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
+        // Reverse the components order so that we look at components from oldest to newest.
         Collections.reverse(immutableComponents);
 
-        long totalSize = 0;
-        int startIndex = -1;
-
-        int numComponents = immutableComponents.size();
-
-        for (int i = 0; i < numComponents; i++) {
-            ILSMComponent c = immutableComponents.get(i);
-            long componentSize = ((ILSMDiskComponent) c).getComponentSize();
-            if (componentSize > maxMergableComponentSize || ((ILSMDiskComponent) c).getComponentId().notFound()) {
-                startIndex = i;
-                totalSize = 0;
-                continue;
-            }
-            totalSize += componentSize;
-            boolean isLastComponent = i + 1 == numComponents ? true : false;
-            if (totalSize > maxMergableComponentSize
-                    || (isLastComponent && i - startIndex >= maxToleranceComponentCount)) {
-                //merge disk components from startIndex+1 to i
-                long minID = Long.MAX_VALUE;
-                long maxID = Long.MIN_VALUE;
-                for (int j = startIndex + 1; j <= i; j++) {
-                    ILSMDiskComponentId id = immutableComponents.get(j).getComponentId();
-                    if (minID > id.getMinId()) {
-                        minID = id.getMinId();
-                    }
-                    if (maxID < id.getMaxId()) {
-                        maxID = id.getMaxId();
-                    }
-                }
-                Set<IndexInfo> indexInfos = datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos();
-                int partition = getIndexPartition(index, indexInfos);
-                triggerScheduledMerge(minID, maxID, indexInfos.stream().filter(info -> info.getPartition() == partition)
-                        .collect(Collectors.toSet()));
-                return true;
-            }
+        Pair<Integer, Integer> mergeableIndexes = getMergableComponentsIndex(immutableComponents);
+        if (mergeableIndexes == null) {
+            //nothing to merge
+            return false;
         }
-        return false;
+        long minID = immutableComponents.get(mergeableIndexes.getLeft()).getComponentId().getMinId();
+        long maxID = immutableComponents.get(mergeableIndexes.getRight()).getComponentId().getMaxId();
+
+        Set<IndexInfo> indexInfos = datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos();
+        int partition = getIndexPartition(index, indexInfos);
+        triggerScheduledMerge(minID, maxID,
+                indexInfos.stream().filter(info -> info.getPartition() == partition).collect(Collectors.toSet()));
+        return true;
     }
 
     /**
@@ -224,15 +117,13 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy {
             List<ILSMDiskComponent> mergableComponents = new ArrayList<>();
             for (ILSMDiskComponent component : immutableComponents) {
                 ILSMDiskComponentId id = component.getComponentId();
-                if (!id.notFound()) {
-                    if (id.getMinId() >= minID && id.getMaxId() <= maxID) {
-                        mergableComponents.add(component);
-                    }
-                    if (id.getMaxId() < minID) {
-                        //disk components are ordered from latest (with largest IDs) to oldest (with smallest IDs)
-                        //if the component.maxID < minID, we can safely skip the rest disk components in the list
-                        break;
-                    }
+                if (id.getMinId() >= minID && id.getMaxId() <= maxID) {
+                    mergableComponents.add(component);
+                }
+                if (id.getMaxId() < minID) {
+                    //disk components are ordered from latest (with largest IDs) to oldest (with smallest IDs)
+                    //if the component.maxID < minID, we can safely skip the rest disk components in the list
+                    break;
                 }
             }
             ILSMIndexAccessor accessor =
@@ -241,62 +132,6 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy {
         }
     }
 
-    /**
-     * This method returns the number of mergable components among the given list
-     * of immutable components that are ordered from the latest component to order ones. A caller
-     * need to make sure the order in the list.
-     *
-     * @param immutableComponents
-     * @return the number of mergable component
-     * @throws HyracksDataException
-     */
-    private int getMergableImmutableComponentCount(List<ILSMDiskComponent> immutableComponents)
-            throws HyracksDataException {
-        int count = 0;
-        for (ILSMComponent c : immutableComponents) {
-            long componentSize = ((ILSMDiskComponent) c).getComponentSize();
-            //stop when the first non-mergable component is found.
-            if (c.getState() != ComponentState.READABLE_UNWRITABLE || componentSize > maxMergableComponentSize
-                    || ((ILSMDiskComponent) c).getComponentId().notFound()) {
-                break;
-            }
-            ++count;
-        }
-        return count;
-    }
-
-    /**
-     * This method returns whether there is an ongoing merge operation or not by checking
-     * each component state of given components.
-     *
-     * @param immutableComponents
-     * @return true if there is an ongoing merge operation, false otherwise.
-     */
-    private boolean isMergeOngoing(List<ILSMDiskComponent> immutableComponents) {
-        int size = immutableComponents.size();
-        for (int i = 0; i < size; i++) {
-            if (immutableComponents.get(i).getState() == ComponentState.READABLE_MERGING) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    /**
-     * checks whether all given components are of READABLE_UNWRITABLE state
-     *
-     * @param immutableComponents
-     * @return true if all components are of READABLE_UNWRITABLE state, false otherwise.
-     */
-    private boolean areComponentsReadableUnwritableState(List<ILSMDiskComponent> immutableComponents) {
-        for (ILSMComponent c : immutableComponents) {
-            if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
-                return false;
-            }
-        }
-        return true;
-    }
-
     private int getIndexPartition(ILSMIndex index, Set<IndexInfo> indexInfos) {
         for (IndexInfo info : indexInfos) {
             if (info.getIndex() == index) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/56d972fa/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
index 3c141bc..25242d4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
@@ -19,27 +19,19 @@
 
 package org.apache.asterix.common.context;
 
-import java.util.Arrays;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.impls.PrefixMergePolicyFactory;
 
-public class CorrelatedPrefixMergePolicyFactory implements ILSMMergePolicyFactory {
+public class CorrelatedPrefixMergePolicyFactory extends PrefixMergePolicyFactory {
 
     private static final long serialVersionUID = 1L;
     public static final String NAME = "correlated-prefix";
     public static final String KEY_DATASET_ID = "datasetId";
-    public static final String KEY_MAX_COMPONENT_SIZE = "max-mergable-component-size";
-    public static final String KEY_MAX_COMPONENT_COUNT = "max-tolerance-component-count";
-
-    private static final String[] SET_VALUES = new String[] { KEY_MAX_COMPONENT_SIZE, KEY_MAX_COMPONENT_COUNT };
-    private static final Set<String> PROPERTIES_NAMES = new HashSet<>(Arrays.asList(SET_VALUES));
 
     @Override
     public String getName() {
@@ -47,11 +39,6 @@ public class CorrelatedPrefixMergePolicyFactory implements ILSMMergePolicyFactor
     }
 
     @Override
-    public Set<String> getPropertiesNames() {
-        return PROPERTIES_NAMES;
-    }
-
-    @Override
     public ILSMMergePolicy createMergePolicy(Map<String, String> configuration, INCServiceContext ctx) {
         IDatasetLifecycleManager dslcManager =
                 ((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/56d972fa/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
index e36f4a8..c18ecc2 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
@@ -29,7 +29,6 @@ import java.util.Set;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.context.CorrelatedPrefixMergePolicy;
-import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
 import org.apache.asterix.common.context.DatasetInfo;
 import org.apache.asterix.common.context.IndexInfo;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -154,69 +153,6 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase {
     }
 
     @Test
-    public void testPrimaryNotFound() {
-        try {
-            List<ILSMDiskComponentId> primaryComponentIDs = Arrays.asList(new LSMDiskComponentId(40, 50),
-                    new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
-                    new LSMDiskComponentId(ILSMDiskComponentId.NOT_FOUND, ILSMDiskComponentId.NOT_FOUND),
-                    new LSMDiskComponentId(10, 19));
-            List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
-            IndexInfo primary = mockIndex(true, primaryComponentIDs, resultPrimaryIDs, 0);
-
-            List<ILSMDiskComponentId> secondaryComponentIDs = Arrays.asList(new LSMDiskComponentId(30, 35),
-                    new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24));
-            List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
-            IndexInfo secondary = mockIndex(false, secondaryComponentIDs, resultSecondaryIDs, 0);
-
-            ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
-
-            policy.diskComponentAdded(secondary.getIndex(), false);
-            Assert.assertTrue(resultPrimaryIDs.isEmpty());
-            Assert.assertTrue(resultSecondaryIDs.isEmpty());
-
-            policy.diskComponentAdded(primary.getIndex(), false);
-            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(40, 50), new LSMDiskComponentId(30, 35),
-                    new LSMDiskComponentId(25, 29)), resultPrimaryIDs);
-            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29)),
-                    resultSecondaryIDs);
-        } catch (HyracksDataException e) {
-            Assert.fail(e.getMessage());
-        }
-    }
-
-    @Test
-    public void testSecondaryNotFound() {
-        try {
-            List<ILSMDiskComponentId> primaryComponentIDs = Arrays.asList(new LSMDiskComponentId(40, 50),
-                    new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29), new LSMDiskComponentId(20, 24),
-                    new LSMDiskComponentId(10, 19));
-            List<ILSMDiskComponentId> resultPrimaryIDs = new ArrayList<>();
-            IndexInfo primary = mockIndex(true, primaryComponentIDs, resultPrimaryIDs, 0);
-
-            List<ILSMDiskComponentId> secondaryComponentIDs = Arrays.asList(new LSMDiskComponentId(30, 35),
-                    new LSMDiskComponentId(ILSMDiskComponentId.NOT_FOUND, ILSMDiskComponentId.NOT_FOUND),
-                    new LSMDiskComponentId(20, 24));
-            List<ILSMDiskComponentId> resultSecondaryIDs = new ArrayList<>();
-            IndexInfo secondary = mockIndex(false, secondaryComponentIDs, resultSecondaryIDs, 0);
-
-            ILSMMergePolicy policy = mockMergePolicy(primary, secondary);
-
-            policy.diskComponentAdded(secondary.getIndex(), false);
-            Assert.assertTrue(resultPrimaryIDs.isEmpty());
-            Assert.assertTrue(resultSecondaryIDs.isEmpty());
-
-            policy.diskComponentAdded(primary.getIndex(), false);
-            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(25, 29),
-                    new LSMDiskComponentId(20, 24), new LSMDiskComponentId(10, 19)), resultPrimaryIDs);
-            Assert.assertEquals(Arrays.asList(new LSMDiskComponentId(30, 35), new LSMDiskComponentId(20, 24)),
-                    resultSecondaryIDs);
-
-        } catch (HyracksDataException e) {
-            Assert.fail(e.getMessage());
-        }
-    }
-
-    @Test
     public void testMultiPartition() {
         try {
             List<ILSMDiskComponentId> componentIDs = Arrays.asList(new LSMDiskComponentId(40, 50),
@@ -251,8 +187,8 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase {
 
     private ILSMMergePolicy mockMergePolicy(IndexInfo... indexes) {
         Map<String, String> properties = new HashMap<>();
-        properties.put(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_COUNT, String.valueOf(MAX_COMPONENT_COUNT));
-        properties.put(CorrelatedPrefixMergePolicyFactory.KEY_MAX_COMPONENT_SIZE, String.valueOf(MAX_COMPONENT_SIZE));
+        properties.put("max-tolerance-component-count", String.valueOf(MAX_COMPONENT_COUNT));
+        properties.put("max-mergable-component-size", String.valueOf(MAX_COMPONENT_SIZE));
 
         Set<IndexInfo> indexInfos = new HashSet<>();
         for (IndexInfo info : indexes) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/56d972fa/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
index 329e4fb..6878910 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
@@ -35,8 +35,8 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 
 public class PrefixMergePolicy implements ILSMMergePolicy {
-    private long maxMergableComponentSize;
-    private int maxToleranceComponentCount;
+    protected long maxMergableComponentSize;
+    protected int maxToleranceComponentCount;
 
     /**
      * This parameter is used to avoid merging a big component with a sequence of small components.
@@ -178,7 +178,7 @@ public class PrefixMergePolicy implements ILSMMergePolicy {
      * @param immutableComponents
      * @return true if there is an ongoing merge operation, false otherwise.
      */
-    private boolean isMergeOngoing(List<ILSMDiskComponent> immutableComponents) {
+    protected boolean isMergeOngoing(List<ILSMDiskComponent> immutableComponents) {
         int size = immutableComponents.size();
         for (int i = 0; i < size; i++) {
             if (immutableComponents.get(i).getState() == ComponentState.READABLE_MERGING) {
@@ -196,7 +196,7 @@ public class PrefixMergePolicy implements ILSMMergePolicy {
      * @param immutableComponents
      * @return the number of mergable component
      */
-    private int getMergableImmutableComponentCount(List<ILSMDiskComponent> immutableComponents) {
+    protected int getMergableImmutableComponentCount(List<ILSMDiskComponent> immutableComponents) {
         Pair<Integer, Integer> mergableIndexes = getMergableComponentsIndex(immutableComponents);
         return mergableIndexes == null ? 0 : mergableIndexes.getRight() - mergableIndexes.getLeft() + 1;
     }
@@ -207,7 +207,7 @@ public class PrefixMergePolicy implements ILSMMergePolicy {
      * @param immutableComponents
      * @return true if all components are of READABLE_UNWRITABLE state, false otherwise.
      */
-    private boolean areComponentsReadableWritableState(List<ILSMDiskComponent> immutableComponents) {
+    protected boolean areComponentsReadableWritableState(List<ILSMDiskComponent> immutableComponents) {
         for (ILSMComponent c : immutableComponents) {
             if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
                 return false;
@@ -224,21 +224,21 @@ public class PrefixMergePolicy implements ILSMMergePolicy {
      * @throws HyracksDataException
      * @throws IndexException
      */
-    private boolean scheduleMerge(final ILSMIndex index) throws HyracksDataException {
+    protected boolean scheduleMerge(final ILSMIndex index) throws HyracksDataException {
         List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getImmutableComponents());
         // Reverse the components order so that we look at components from oldest to newest.
         Collections.reverse(immutableComponents);
 
         Pair<Integer, Integer> mergeableIndexes = getMergableComponentsIndex(immutableComponents);
         if (mergeableIndexes != null) {
-            scheduleMerge(index, immutableComponents, mergeableIndexes.getLeft(), mergeableIndexes.getRight());
+            triggerScheduleMerge(index, immutableComponents, mergeableIndexes.getLeft(), mergeableIndexes.getRight());
             return true;
         } else {
             return false;
         }
     }
 
-    private void scheduleMerge(ILSMIndex index, List<ILSMDiskComponent> immutableComponents, int startIndex,
+    private void triggerScheduleMerge(ILSMIndex index, List<ILSMDiskComponent> immutableComponents, int startIndex,
             int endIndex) throws HyracksDataException {
         List<ILSMDiskComponent> mergableComponents =
                 new ArrayList<>(immutableComponents.subList(startIndex, endIndex + 1));
@@ -265,7 +265,7 @@ public class PrefixMergePolicy implements ILSMMergePolicy {
      * @return a pair of indexes indicating the start and end position of the sequence
      *         otherwise, return null if no sequence is found
      */
-    private Pair<Integer, Integer> getMergableComponentsIndex(List<ILSMDiskComponent> immutableComponents) {
+    protected Pair<Integer, Integer> getMergableComponentsIndex(List<ILSMDiskComponent> immutableComponents) {
         int numComponents = immutableComponents.size();
         for (int i = 0; i < numComponents; i++) {
             if (immutableComponents.get(i).getComponentSize() > maxMergableComponentSize