You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by va...@apache.org on 2023/04/23 07:57:19 UTC

[incubator-hugegraph] branch zy_dev created (now 9fc12c1f0)

This is an automated email from the ASF dual-hosted git repository.

vaughn pushed a change to branch zy_dev
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git


      at 9fc12c1f0 chore: single task and batch consume remove left index task

This branch includes the following new commits:

     new 9fc12c1f0 chore: single task and batch consume remove left index task

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-hugegraph] 01/01: chore: single task and batch consume remove left index task

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vaughn pushed a commit to branch zy_dev
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git

commit 9fc12c1f0abcfbe487a12812ca368bfc2b0aad53
Author: vaughn <va...@apache.org>
AuthorDate: Sun Apr 23 15:56:59 2023 +0800

    chore: single task and batch consume remove left index task
---
 .../backend/tx/GraphIndexTransaction.java          | 158 ++++++++++++++++++---
 1 file changed, 138 insertions(+), 20 deletions(-)

diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java
index f4ff8b32e..0b832e78c 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java
@@ -27,10 +27,17 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import groovy.lang.Tuple2;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hugegraph.backend.page.PageIds;
 import org.apache.hugegraph.backend.page.PageState;
@@ -95,6 +102,8 @@ import org.apache.hugegraph.util.NumericUtil;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
+import javax.security.auth.callback.Callback;
+
 public class GraphIndexTransaction extends AbstractTransaction {
 
     public static final String START_SYMBOL = "(";
@@ -1717,22 +1726,79 @@ public class GraphIndexTransaction extends AbstractTransaction {
         }
     }
 
+    public static class RemoveLeftIndexJobHelper {
+
+        public static final int CAPACITY = 2000;
+
+        private final BlockingQueue<Tuple2<ConditionQuery, HugeElement>> pendingQueue =
+                             new ArrayBlockingQueue<>(CAPACITY);
+
+        private AtomicReference<State> state;
+
+        private HugeGraph graph;
+
+        enum State {
+            INIT,
+            EXECUTE,
+        }
+
+        private RemoveLeftIndexJobHelper(HugeGraph graph) {
+            this.state = new AtomicReference<>(State.INIT);
+        }
+
+        public void add(ConditionQuery query, HugeElement element) {
+            if (query == null || element == null) {
+                return;
+            }
+
+            if (!pendingQueue.offer(new Tuple2(query, element))) {
+                LOG.warn("The pending queue of RemoveLeftIndexJob is full");
+                return;
+            }
+
+           this.reSchedule();
+        }
+
+        public void consumeComplete() {
+            this.state.compareAndSet(State.EXECUTE, State.INIT);
+        }
+
+        public void reSchedule() {
+            if (this.state.compareAndSet(State.INIT, State.EXECUTE)) {
+                try {
+                    RemoveLeftIndexJob job = new RemoveLeftIndexJob(pendingQueue, this::consumeComplete, this::reSchedule);
+                    EphemeralJobBuilder.of(this.graph)
+                            .name("batch-remove-left-index")
+                            .job(job)
+                            .schedule();
+                } catch (Throwable e) {
+                    LOG.warn("Failed to schedule RemoveLeftIndexJob", e);
+                    this.state.compareAndSet(State.EXECUTE, State.INIT);
+                }
+            }
+        }
+    }
+
     public static class RemoveLeftIndexJob extends EphemeralJob<Object> {
 
         private static final String REMOVE_LEFT_INDEX = "remove_left_index";
+        public static final int MAX_CONSUME_COUNT = 1000;
 
-        private final ConditionQuery query;
-        private final HugeElement element;
         private GraphIndexTransaction tx;
-        private Set<ConditionQuery.LeftIndex> leftIndexes;
 
-        private RemoveLeftIndexJob(ConditionQuery query, HugeElement element) {
-            E.checkArgumentNotNull(query, "query");
-            E.checkArgumentNotNull(element, "element");
-            this.query = query;
-            this.element = element;
-            this.tx = null;
-            this.leftIndexes = query.getLeftIndexOfElement(element.id());
+        private Queue<Tuple2<ConditionQuery, HugeElement>> queue;
+        private Runnable completeCallback;
+        private Runnable scheduler;
+
+        private RemoveLeftIndexJob(Queue<Tuple2<ConditionQuery, HugeElement>> queue,
+                                   Runnable completeCallback,
+                                   Runnable scheduler) {
+            E.checkArgumentNotNull(queue, "The queue can't be null");
+            E.checkArgumentNotNull(completeCallback, "The callback can't be null");
+            E.checkArgumentNotNull(scheduler, "The scheduler can't be null");
+            this.queue = queue;
+            this.completeCallback = completeCallback;
+            this.scheduler = scheduler;
         }
 
         @Override
@@ -1742,10 +1808,61 @@ public class GraphIndexTransaction extends AbstractTransaction {
 
         @Override
         public Object execute() {
-            this.tx = this.element.schemaLabel().system() ?
-                      this.params().systemTransaction().indexTransaction() :
-                      this.params().graphTransaction().indexTransaction();
-            return this.removeIndexLeft(this.query, this.element);
+            boolean stop = false;
+            List<Tuple2<ConditionQuery, HugeElement>> systemElements = new ArrayList<>();
+            List<Tuple2<ConditionQuery, HugeElement>> graphElements = new ArrayList<>();
+            final int pageSize = 100;
+            int count = 0;
+            int consumeCount = 0;
+            while (!stop) {
+
+                while (!this.queue.isEmpty() && (systemElements.size() + graphElements.size()) < pageSize) {
+                    Tuple2<ConditionQuery, HugeElement> query2Element = this.queue.poll();
+                    if (query2Element.getSecond().schemaLabel().system()) {
+                        systemElements.add(query2Element);
+                    } else {
+                        graphElements.add(query2Element);
+                    }
+
+                    consumeCount ++;
+                }
+
+                if (!systemElements.isEmpty()) {
+                    this.tx = this.params().systemTransaction().indexTransaction();
+                    for (Tuple2<ConditionQuery, HugeElement> query2Element : systemElements) {
+                        try {
+                            count += this.removeIndexLeft(query2Element.getFirst(), query2Element.getSecond());
+                        } catch (Throwable e) {
+                            LOG.warn("Failed to remove left index for system element {}", query2Element.getSecond().id(), e);
+                        }
+                    }
+                    this.tx.commit();
+                    systemElements.clear();
+                }
+
+                if (!graphElements.isEmpty()) {
+                    this.tx = this.params().graphTransaction().indexTransaction();
+                    for (Tuple2<ConditionQuery, HugeElement> query2Element : graphElements) {
+                        try {
+                            count += this.removeIndexLeft(query2Element.getFirst(), query2Element.getSecond());
+                        } catch (Throwable e) {
+                            LOG.warn("Failed to remove left index for graph element {}", query2Element.getSecond().id(), e);
+                        }
+                    }
+                    this.tx.commit();
+                    graphElements.clear();
+                }
+
+                if (this.queue.isEmpty() || consumeCount > MAX_CONSUME_COUNT) {
+                    this.completeCallback.run();
+                    stop = true;
+                    if (!this.queue.isEmpty()) {
+                        this.scheduler.run();
+                    }
+                }
+            }
+
+            return count;
         }
 
         protected long removeIndexLeft(ConditionQuery query,
@@ -1776,24 +1893,25 @@ public class GraphIndexTransaction extends AbstractTransaction {
             long sCount = 0;
             for (ConditionQuery cq: ConditionQueryFlatten.flatten(query)) {
                 // Process range index
-                rCount += this.processRangeIndexLeft(cq, element);
+                rCount += this.processRangeIndexLeft(query, cq, element);
                 // Process secondary index or search index
                 sCount += this.processSecondaryOrSearchIndexLeft(cq, element);
             }
-            this.tx.commit();
             return rCount + sCount;
         }
 
         private long processRangeIndexLeft(ConditionQuery query,
+                                           ConditionQuery flattenQuery,
                                            HugeElement element) {
             long count = 0;
-            if (this.leftIndexes == null) {
+            Set<ConditionQuery.LeftIndex> leftIndexes = query.getLeftIndexOfElement(element.id());
+            if (CollectionUtils.isEmpty(leftIndexes)) {
                 return count;
             }
 
-            for (ConditionQuery.LeftIndex leftIndex : this.leftIndexes) {
+            for (ConditionQuery.LeftIndex leftIndex : leftIndexes) {
                 Set<Object> indexValues = leftIndex.indexFieldValues();
-                IndexLabel indexLabel = this.findMatchedIndexLabel(query,
+                IndexLabel indexLabel = this.findMatchedIndexLabel(flattenQuery,
                                                                    leftIndex);
                 assert indexLabel != null;
 
@@ -1807,7 +1925,7 @@ public class GraphIndexTransaction extends AbstractTransaction {
                 }
             }
             // Remove LeftIndex after constructing remove job
-            this.query.removeElementLeftIndex(element.id());
+            query.removeElementLeftIndex(element.id());
             this.tx.commit();
             return count;
         }