You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ad...@apache.org on 2019/01/28 13:40:00 UTC

svn commit: r1852357 - /jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java

Author: adulceanu
Date: Mon Jan 28 13:40:00 2019
New Revision: 1852357

URL: http://svn.apache.org/viewvc?rev=1852357&view=rev
Log:
OAK-7977 - Add multi-threaded segment transfer to oak-run segment-copy
Simplified segments migration

Modified:
    jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java

Modified: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java?rev=1852357&r1=1852356&r2=1852357&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java Mon Jan 28 13:40:00 2019
@@ -47,14 +47,10 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
-import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 public class SegmentStoreMigrator {
 
@@ -150,53 +146,19 @@ public class SegmentStoreMigrator {
     }
 
     private void migrateSegments(SegmentArchiveReader reader, SegmentArchiveWriter writer)
-            throws InterruptedException, ExecutionException {
-        BlockingDeque<Segment> readDeque = new LinkedBlockingDeque<>(READ_THREADS);
-        BlockingDeque<Segment> writeDeque = new LinkedBlockingDeque<>(READ_THREADS);
-        AtomicBoolean processingFinished = new AtomicBoolean(false);
-        AtomicBoolean exception = new AtomicBoolean(false);
-        List<Future<?>> futures = new ArrayList<>();
-        for (int i = 0; i < READ_THREADS; i++) {
+            throws ExecutionException, InterruptedException, IOException {
+        List<Future<Segment>> futures = new ArrayList<>();
+        for (SegmentArchiveEntry entry : reader.listSegments()) {
             futures.add(executor.submit(() -> {
-                try {
-                    while (!exception.get() && !(readDeque.isEmpty() && processingFinished.get())) {
-                        Segment segment = readDeque.poll(100, TimeUnit.MILLISECONDS);
-                        if (segment != null) {
-                            segment.read(reader);
-                        }
-                    }
-                    return null;
-                } catch (Exception e) {
-                    exception.set(true);
-                    throw e;
-                }
+                Segment segment = new Segment(entry);
+                segment.read(reader);
+                return segment;
             }));
         }
-        futures.add(executor.submit(() -> {
-            try {
-                while (!exception.get() && !(writeDeque.isEmpty() && processingFinished.get())) {
-                    Segment segment = writeDeque.poll(100, TimeUnit.MILLISECONDS);
-                    if (segment != null) {
-                        while (segment.data == null && !exception.get()) {
-                            Thread.sleep(10);
-                        }
-                        segment.write(writer);
-                    }
-                }
-                return null;
-            } catch (Exception e) {
-                exception.set(true);
-                throw e;
-            }
-        }));
-        for (SegmentArchiveEntry entry : reader.listSegments()) {
-            Segment segment = new Segment(entry);
-            readDeque.putLast(segment);
-            writeDeque.putLast(segment);
-        }
-        processingFinished.set(true);
-        for (Future<?> future : futures) {
-            future.get();
+
+        for (Future<Segment> future : futures) {
+            Segment segment = future.get();
+            segment.write(writer);
         }
     }