You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hugegraph.apache.org by "javeme (via GitHub)" <gi...@apache.org> on 2023/09/16 11:33:27 UTC

[GitHub] [incubator-hugegraph] javeme commented on a diff in pull request #2312: feat(core): support batch+parallel edges traverse

javeme commented on code in PR #2312:
URL: https://github.com/apache/incubator-hugegraph/pull/2312#discussion_r1327947375


##########
hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java:
##########
@@ -43,55 +43,55 @@ public final class Consumers<V> {
     public static final int THREADS = 4 + CoreOptions.CPUS / 4;
     public static final int QUEUE_WORKER_SIZE = 1000;
     public static final long CONSUMER_WAKE_PERIOD = 1;
+    private static final Object QUEUE_END = new VWrapper(null);

Review Comment:
   can we remove VWrapper class and just use `new Object()` instead



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -317,61 +318,63 @@ public void accept(EdgeId edgeId) {
         }
     }
 
-    public abstract class EdgeItConsumer<T, E> implements Consumer<Iterator<T>> {
-        private final Consumer<E> parseConsumer;
+    public abstract class EdgesConsumer<T, E> implements Consumer<Iterator<T>> {
+
+        private final Consumer<E> consumer;
         private final long capacity;
 
-        public EdgeItConsumer(Consumer<E> parseConsumer, long capacity) {
-            this.parseConsumer = parseConsumer;
+        public EdgesConsumer(Consumer<E> consumer, long capacity) {
+            this.consumer = consumer;
             this.capacity = capacity;
         }
 
-        protected abstract Iterator<E> prepare(Iterator<T> it);
+        protected abstract Iterator<E> prepare(Iterator<T> iter);
 
         @Override
-        public void accept(Iterator<T> edges) {
-            Iterator<E> ids = prepare(edges);
+        public void accept(Iterator<T> edgeIter) {
+            Iterator<E> ids = prepare(edgeIter);
             long counter = 0;
             while (ids.hasNext()) {
                 if (Thread.currentThread().isInterrupted()) {
                     LOG.warn("Consumer is Interrupted");
                     break;
                 }
                 counter++;
-                parseConsumer.accept(ids.next());
+                consumer.accept(ids.next());

Review Comment:
   keep member access with `this.consumer` style



##########
hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java:
##########
@@ -279,7 +279,7 @@ private void shutdownOpenPool(ExecutorService openPool) {
         this.useSessions();
         try {
             Consumers.executeOncePerThread(openPool, OPEN_POOL_THREADS,
-                                           this::closeSessions);
+                                           this::closeSessions, 5, TimeUnit.SECONDS);

Review Comment:
   ditto



##########
hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java:
##########
@@ -251,8 +251,10 @@ public ExecutorService executor() {
 
     public static void executeOncePerThread(ExecutorService executor,
                                             int totalThreads,
-                                            Runnable callback)
-            throws InterruptedException {
+                                            Runnable callback,
+                                            int invokeTimeout,
+                                            TimeUnit unit)

Review Comment:
   can we specify the units to seconds? and remove the unit param



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -180,30 +217,26 @@ private <K> long consumersStart(Iterator<Iterator<K>> iterator, String name,
             } catch (Throwable e) {
                 throw Consumers.wrapException(e);
             } finally {
-                CloseableIterator.closeIterator(iterator);
+                CloseableIterator.closeIterator(sources);
             }
         }
         return total;
     }
 
-    private <K> Consumers<Iterator<K>> getConsumers(Consumer<Iterator<K>> consumer,
-                                                    int queueWorkerSize,
-                                                    AtomicBoolean done,
-                                                    ExecutorService executor) {
-        Consumers<Iterator<K>> consumers;
-        consumers = new Consumers<>(executor,
-                                    consumer,
-                                    null,
-                                    e -> done.set(true),
-                                    queueWorkerSize);
-        return consumers;
+    private <K> Consumers<Iterator<K>> buildConsumers(Consumer<Iterator<K>> consumer,
+                                                      int queueWorkerSize,

Review Comment:
   also update to queueSizePerWorker



##########
hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java:
##########
@@ -47,7 +48,7 @@ public final class TaskManager {
                                "server-info-db-worker-%d";
     public static final String TASK_SCHEDULER = "task-scheduler-%d";
 
-    protected static final long SCHEDULE_PERIOD = 1000L; // unit ms
+    static final long SCHEDULE_PERIOD = 1000L; // unit ms

Review Comment:
   prefer to keep the protected mark



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -141,33 +142,69 @@ protected <K> long traverse(Iterator<K> iterator, Consumer<K> consumer,
         return total;
     }
 
-    protected <K> long traverseBatch(Iterator<Iterator<K>> iterator,
-                                     Consumer<Iterator<K>> consumer,
-                                     String name, int queueWorkerSize) {
-        if (!iterator.hasNext()) {
+    protected void traverseIdsByBfs(Iterator<Id> vertices,
+                                    Directions dir,
+                                    Id label,
+                                    long degree,
+                                    long capacity,
+                                    Consumer<EdgeId> consumer) {
+        List<Id> labels =
+                label == null ? Collections.emptyList() : Collections.singletonList(label);
+        OneStepEdgeIterConsumer edgeIterConsumer = new OneStepEdgeIterConsumer(consumer, capacity);
+
+        EdgesIterator edgeIter = edgesOfVertices(vertices, dir, labels, degree);
+
+        // parallel out-of-order execution
+        this.traverseByBatch(edgeIter, edgeIterConsumer, "traverse-ite-edge", 1);
+    }
+
+    protected void traverseIdsByBfs(Iterator<Id> vertices,
+                                    Steps steps,
+                                    long capacity,
+                                    Consumer<Edge> consumer) {
+        StepsEdgeIterConsumer edgeIterConsumer =
+                new StepsEdgeIterConsumer(consumer, capacity, steps);
+
+        EdgesQueryIterator queryIterator = new EdgesQueryIterator(vertices,
+                                                                  steps.direction(),
+                                                                  steps.edgeLabels(),
+                                                                  steps.degree());
+
+        // get Iterator<Iterator<edges>> from Iterator<Query>
+        EdgesIterator edgeIter = new EdgesIterator(queryIterator);
+
+        // parallel out-of-order execution
+        this.traverseByBatch(edgeIter, edgeIterConsumer, "traverse-ite-edge", 1);

Review Comment:
   prefer "traverse-bfs-steps"



##########
hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java:
##########
@@ -134,7 +135,7 @@ private void closeTaskTx(HugeGraphParams graph) {
                 graph.closeTx();
             } else {
                 Consumers.executeOncePerThread(this.taskExecutor, totalThreads,
-                                               graph::closeTx);
+                                               graph::closeTx, 5, TimeUnit.SECONDS);

Review Comment:
   set timeout to 30s? also define a const TX_CLOSE_TIMEOUT



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -175,4 +278,103 @@ public List<V> getValues(K key) {
             return values;
         }
     }
+
+    public static class ConcurrentVerticesConsumer implements Consumer<EdgeId> {
+
+        private final Id sourceV;
+        private final Set<Id> excluded;
+        private final Set<Id> neighbors;
+        private final long limit;
+        private final AtomicInteger count = new AtomicInteger(0);
+
+        public ConcurrentVerticesConsumer(Id sourceV, Set<Id> excluded, long limit,
+                                          Set<Id> neighbors) {
+            this.sourceV = sourceV;
+            this.excluded = excluded;
+            this.limit = limit;
+            this.neighbors = neighbors;
+        }
+
+        @Override
+        public void accept(EdgeId edgeId) {
+            if (limit != NO_LIMIT && count.get() >= limit) {
+                throw new Consumers.StopExecution("reach limit");
+            }
+
+            Id targetV = edgeId.otherVertexId();
+            if (sourceV.equals(targetV)) {

Review Comment:
   expect this.xx prefix, we need to keep member access with this.sourceV style



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -317,61 +318,63 @@ public void accept(EdgeId edgeId) {
         }
     }
 
-    public abstract class EdgeItConsumer<T, E> implements Consumer<Iterator<T>> {
-        private final Consumer<E> parseConsumer;
+    public abstract class EdgesConsumer<T, E> implements Consumer<Iterator<T>> {
+
+        private final Consumer<E> consumer;
         private final long capacity;
 
-        public EdgeItConsumer(Consumer<E> parseConsumer, long capacity) {
-            this.parseConsumer = parseConsumer;
+        public EdgesConsumer(Consumer<E> consumer, long capacity) {
+            this.consumer = consumer;
             this.capacity = capacity;
         }
 
-        protected abstract Iterator<E> prepare(Iterator<T> it);
+        protected abstract Iterator<E> prepare(Iterator<T> iter);
 
         @Override
-        public void accept(Iterator<T> edges) {
-            Iterator<E> ids = prepare(edges);
+        public void accept(Iterator<T> edgeIter) {
+            Iterator<E> ids = prepare(edgeIter);
             long counter = 0;
             while (ids.hasNext()) {
                 if (Thread.currentThread().isInterrupted()) {
                     LOG.warn("Consumer is Interrupted");
                     break;
                 }
                 counter++;
-                parseConsumer.accept(ids.next());
+                consumer.accept(ids.next());
             }
             long total = edgeIterCounter.addAndGet(counter);
-            // 按批次检测 capacity,以提高性能
+            // traverse by batch & improve performance
             if (this.capacity != NO_LIMIT && total >= capacity) {
                 throw new Consumers.StopExecution("reach capacity");
             }
         }
     }
 
-    public class CapacityConsumer extends EdgeItConsumer<Edge, EdgeId> {
-        public CapacityConsumer(Consumer<EdgeId> parseConsumer, long capacity) {
-            super(parseConsumer, capacity);
+    public class OneStepEdgeIterConsumer extends EdgesConsumer<Edge, EdgeId> {
+
+        public OneStepEdgeIterConsumer(Consumer<EdgeId> consumer, long capacity) {
+            super(consumer, capacity);
         }
 
         @Override
-        protected Iterator<EdgeId> prepare(Iterator<Edge> edges) {
-            return new MapperIterator<>(edges, (e) -> ((HugeEdge) e).id());
+        protected Iterator<EdgeId> prepare(Iterator<Edge> edgeIter) {
+            return new MapperIterator<>(edgeIter, (e) -> ((HugeEdge) e).id());
         }
     }
 
-    public class CapacityConsumerWithStep extends EdgeItConsumer<Edge, Edge> {
+    public class StepsEdgeIterConsumer extends EdgesConsumer<Edge, Edge> {
+
         private final Steps steps;
 
-        public CapacityConsumerWithStep(Consumer<Edge> parseConsumer, long capacity,
-                                        Steps steps) {
-            super(parseConsumer, capacity);
+        public StepsEdgeIterConsumer(Consumer<Edge> consumer, long capacity,
+                                     Steps steps) {
+            super(consumer, capacity);
             this.steps = steps;
         }
 
         @Override
-        protected Iterator<Edge> prepare(Iterator<Edge> edges) {
-            return edgesOfVertexStep(edges, steps);
+        protected Iterator<Edge> prepare(Iterator<Edge> edgeIter) {
+            return edgesOfVertexStep(edgeIter, steps);

Review Comment:
   this.steps



##########
hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java:
##########
@@ -168,23 +205,36 @@ public void provide(V v) throws Throwable {
             throw this.throwException();
         } else {
             try {
-                this.queue.put(v);
+                this.queue.put(new VWrapper<>(v));
+            } catch (InterruptedException e) {
+                LOG.warn("Interrupted while enqueue", e);
+            }
+        }
+    }
+
+    private void putQueueEnd() {
+        if (this.executor != null) {
+            try {
+                this.queue.put((VWrapper<V>) QUEUE_END);
             } catch (InterruptedException e) {
                 LOG.warn("Interrupted while enqueue", e);
             }
         }
     }
 
     public void await() throws Throwable {
-        this.ending = true;
         if (this.executor == null) {
             // call done() directly if without thread pool
             this.done();
         } else {
             try {
+                putQueueEnd();
                 this.latch.await();
             } catch (InterruptedException e) {
                 String error = "Interrupted while waiting for consumers";
+                for (Future f: this.runningFutures) {

Review Comment:
   expect a space `Future f :`



##########
hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java:
##########
@@ -141,33 +142,69 @@ protected <K> long traverse(Iterator<K> iterator, Consumer<K> consumer,
         return total;
     }
 
-    protected <K> long traverseBatch(Iterator<Iterator<K>> iterator,
-                                     Consumer<Iterator<K>> consumer,
-                                     String name, int queueWorkerSize) {
-        if (!iterator.hasNext()) {
+    protected void traverseIdsByBfs(Iterator<Id> vertices,
+                                    Directions dir,
+                                    Id label,
+                                    long degree,
+                                    long capacity,
+                                    Consumer<EdgeId> consumer) {
+        List<Id> labels =
+                label == null ? Collections.emptyList() : Collections.singletonList(label);
+        OneStepEdgeIterConsumer edgeIterConsumer = new OneStepEdgeIterConsumer(consumer, capacity);
+
+        EdgesIterator edgeIter = edgesOfVertices(vertices, dir, labels, degree);
+
+        // parallel out-of-order execution
+        this.traverseByBatch(edgeIter, edgeIterConsumer, "traverse-ite-edge", 1);

Review Comment:
   prefer "traverse-bfs-step"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@hugegraph.apache.org
For additional commands, e-mail: issues-help@hugegraph.apache.org