You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by lu...@apache.org on 2018/09/27 23:17:16 UTC

asterixdb git commit: [ASTERIXDB-2453] Add Improved Constant Merge Policy

Repository: asterixdb
Updated Branches:
  refs/heads/master 8bbf08131 -> 1aeb8b6ce


[ASTERIXDB-2453] Add Improved Constant Merge Policy

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- The current constant merge policy is unsuable because of its high
merge cost, i.e., O(N*N) where N is the number of flushes. This patch
replaces the previous constant merge policy with a more efficient policy
that still enforces a maximum number of components but greatly lowers
the merge cost.
- Extend AbstractLSMIndex with a method to return the total number of
flushes, based on the file name sequencer. This is required by the new
policy.

Change-Id: Ie5f83a4d5fdd3f036b823c906df1760f5110ae0a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2971
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mh...@apache.org>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/1aeb8b6c
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/1aeb8b6c
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/1aeb8b6c

Branch: refs/heads/master
Commit: 1aeb8b6cef3a50d7639749c96538ec2cecaaad9c
Parents: 8bbf081
Author: luochen01 <cl...@uci.edu>
Authored: Thu Sep 27 14:36:06 2018 -0700
Committer: Luo Chen <cl...@uci.edu>
Committed: Thu Sep 27 16:16:57 2018 -0700

----------------------------------------------------------------------
 .../am/lsm/common/impls/AbstractLSMIndex.java   |  15 ++
 .../lsm/common/impls/ConstantMergePolicy.java   | 161 ++++++++++---------
 2 files changed, 102 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1aeb8b6c/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 9199fbb..d3133ce 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -20,12 +20,14 @@
 package org.apache.hyracks.storage.am.lsm.common.impls;
 
 import java.io.IOException;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -901,4 +903,17 @@ public abstract class AbstractLSMIndex implements ILSMIndex {
     protected abstract ILSMDiskComponent doFlush(ILSMIOOperation operation) throws HyracksDataException;
 
     protected abstract ILSMDiskComponent doMerge(ILSMIOOperation operation) throws HyracksDataException;
+
+    public Optional<Long> getLatestDiskComponentSequence() {
+        if (diskComponents.isEmpty()) {
+            return Optional.empty();
+        }
+        final ILSMDiskComponent latestDiskComponent = diskComponents.get(0);
+        final Set<String> diskComponentPhysicalFiles = latestDiskComponent.getLSMComponentPhysicalFiles();
+        final String fileName = diskComponentPhysicalFiles.stream().findAny()
+                .orElseThrow(() -> new IllegalStateException("Disk component without any physical files"));
+        return Optional
+                .of(IndexComponentFileReference.of(Paths.get(fileName).getFileName().toString()).getSequenceEnd());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1aeb8b6c/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
index c642d82..5b71770 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
@@ -19,107 +19,114 @@
 
 package org.apache.hyracks.storage.am.lsm.common.impls;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import org.apache.hyracks.storage.common.IIndexAccessParameters;
 
 public class ConstantMergePolicy implements ILSMMergePolicy {
     private int numComponents;
 
+    private int[][] binomial;
+
     @Override
     public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException {
-        List<ILSMDiskComponent> immutableComponents = index.getDiskComponents();
-
-        if (!areComponentsMergable(immutableComponents)) {
+        List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getDiskComponents());
+        if (!areComponentsReadableWritableState(immutableComponents)) {
             return;
         }
-
         if (fullMergeIsRequested) {
-            IIndexAccessParameters iap =
-                    new IndexAccessParameters(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-            ILSMIndexAccessor accessor = index.createAccessor(iap);
-            accessor.scheduleFullMerge();
-        } else if (immutableComponents.size() >= numComponents) {
             ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            accessor.scheduleMerge(immutableComponents);
+            accessor.scheduleFullMerge();
+            return;
         }
+        scheduleMerge(index);
     }
 
-    @Override
-    public void configure(Map<String, String> properties) {
-        numComponents = Integer.parseInt(properties.get(ConstantMergePolicyFactory.NUM_COMPONENTS));
+    private boolean scheduleMerge(final ILSMIndex index) throws HyracksDataException {
+        Optional<Long> latestSeq = ((AbstractLSMIndex) index).getLatestDiskComponentSequence();
+        if (!latestSeq.isPresent()) {
+            return false;
+        }
+        // sequence number starts from 0, and thus latestSeq + 1 gives the number of flushes
+        int numFlushes = latestSeq.get().intValue() + 1;
+        List<ILSMDiskComponent> immutableComponents = new ArrayList<>(index.getDiskComponents());
+        Collections.reverse(immutableComponents);
+        int size = immutableComponents.size();
+        int depth = 0;
+        while (treeDepth(depth) < numFlushes) {
+            depth++;
+        }
+        int mergedIndex =
+                binomialIndex(depth, Math.min(depth, numComponents) - 1, numFlushes - treeDepth(depth - 1) - 1);
+        if (mergedIndex == size - 1) {
+            return false;
+        }
+        long mergeSize = 0;
+        List<ILSMDiskComponent> mergableComponents = new ArrayList<ILSMDiskComponent>();
+        for (int i = mergedIndex; i < immutableComponents.size(); i++) {
+            mergeSize = mergeSize + immutableComponents.get(i).getComponentSize();
+            mergableComponents.add(immutableComponents.get(i));
+        }
+        Collections.reverse(mergableComponents);
+        ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+        accessor.scheduleMerge(mergableComponents);
+        return true;
     }
 
-    @Override
-    public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException {
-        // see PrefixMergePolicy.isMergeLagging() for the rationale behind this code.
-
-        /**
-         * case 1.
-         * if totalImmutableCommponentCount < threshold,
-         * merge operation is not lagged ==> return false.
-         * case 2.
-         * if a) totalImmutableCommponentCount >= threshold && b) there is an ongoing merge,
-         * merge operation is lagged. ==> return true.
-         * case 3. *SPECIAL CASE*
-         * if a) totalImmutableCommponentCount >= threshold && b) there is *NO* ongoing merge,
-         * merge operation is lagged. ==> *schedule a merge operation* and then return true.
-         * This is a special case that requires to schedule a merge operation.
-         * Otherwise, all flush operations will be hung.
-         * This case can happen in a following situation:
-         * The system may crash when
-         * condition 1) the mergableImmutableCommponentCount >= threshold and
-         * condition 2) merge operation is going on.
-         * After the system is recovered, still condition 1) is true.
-         * If there are flush operations in the same dataset partition after the recovery,
-         * all these flush operations may not proceed since there is no ongoing merge and
-         * there will be no new merge either in this situation.
-         */
-
-        List<ILSMDiskComponent> immutableComponents = index.getDiskComponents();
-        int totalImmutableComponentCount = immutableComponents.size();
-
-        // [case 1]
-        if (totalImmutableComponentCount < numComponents) {
-            return false;
+    private int treeDepth(int d) {
+        if (d < 0) {
+            return 0;
         }
+        return treeDepth(d - 1) + binomialChoose(d + Math.min(d, numComponents) - 1, d);
+    }
 
-        boolean isMergeOngoing = isMergeOngoing(immutableComponents);
+    private int binomialIndex(int d, int h, int t) {
+        if (t < 0 || t > binomialChoose(d + h, h)) {
+            throw new IllegalStateException("Illegal binomial values");
+        }
+        if (t == 0) {
+            return 0;
+        } else if (t < binomialChoose(d + h - 1, h)) {
+            return binomialIndex(d - 1, h, t);
+        }
+        return binomialIndex(d, h - 1, t - binomialChoose(d + h - 1, h)) + 1;
+    }
 
-        // here, implicitly (totalImmutableComponentCount >= numComponents) is true by passing case 1.
-        if (isMergeOngoing) {
-            // [case 2]
-            return true;
-        } else {
-            // [case 3]
-            // schedule a merge operation after making sure that all components are mergable
-            if (!areComponentsMergable(immutableComponents)) {
-                throw new IllegalStateException();
+    private int binomialChoose(int n, int k) {
+        if (k < 0 || k > n) {
+            return 0;
+        }
+        if (k == 0 || k == n) {
+            return 1;
+        }
+        // For efficiency, binomial is persisted to avoid re-computations for every merge
+        if (binomial == null || binomial.length <= n) {
+            binomial = new int[n + 1][n + 1];
+            for (int r = 0; r <= n; r++) {
+                for (int c = 0; c <= r; c++) {
+                    if (c == 0 || c == r) {
+                        binomial[r][c] = 1;
+                    } else {
+                        binomial[r][c] = binomial[r - 1][c - 1] + binomial[r - 1][c];
+                    }
+                }
             }
-            ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            accessor.scheduleMerge(immutableComponents);
-            return true;
         }
+        return binomial[n][k];
     }
 
-    /**
-     * checks whether all given components are mergable or not
-     *
-     * @param immutableComponents
-     * @return true if all components are mergable, false otherwise.
-     */
-    private boolean areComponentsMergable(List<ILSMDiskComponent> immutableComponents) {
+    private boolean areComponentsReadableWritableState(List<ILSMDiskComponent> immutableComponents) {
         for (ILSMComponent c : immutableComponents) {
             if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
                 return false;
@@ -128,12 +135,18 @@ public class ConstantMergePolicy implements ILSMMergePolicy {
         return true;
     }
 
-    /**
-     * This method returns whether there is an ongoing merge operation or not by checking
-     * each component state of given components.
-     *
-     * @return true if there is an ongoing merge operation, false otherwise.
-     */
+    @Override
+    public void configure(Map<String, String> properties) {
+        numComponents = Integer.parseInt(properties.get(ConstantMergePolicyFactory.NUM_COMPONENTS));
+    }
+
+    @Override
+    public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException {
+        // TODO: for now, we simply block the ingestion when there is an ongoing merge
+        List<ILSMDiskComponent> immutableComponents = index.getDiskComponents();
+        return isMergeOngoing(immutableComponents);
+    }
+
     private boolean isMergeOngoing(List<ILSMDiskComponent> immutableComponents) {
         int size = immutableComponents.size();
         for (int i = 0; i < size; i++) {