You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ad...@apache.org on 2019/01/28 13:40:00 UTC
svn commit: r1852357 -
/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java
Author: adulceanu
Date: Mon Jan 28 13:40:00 2019
New Revision: 1852357
URL: http://svn.apache.org/viewvc?rev=1852357&view=rev
Log:
OAK-7977 - Add multi-threaded segment transfer to oak-run segment-copy
Simplified segments migration
Modified:
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java
Modified: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java?rev=1852357&r1=1852356&r2=1852357&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java Mon Jan 28 13:40:00 2019
@@ -47,14 +47,10 @@ import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
-import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
public class SegmentStoreMigrator {
@@ -150,53 +146,19 @@ public class SegmentStoreMigrator {
}
private void migrateSegments(SegmentArchiveReader reader, SegmentArchiveWriter writer)
- throws InterruptedException, ExecutionException {
- BlockingDeque<Segment> readDeque = new LinkedBlockingDeque<>(READ_THREADS);
- BlockingDeque<Segment> writeDeque = new LinkedBlockingDeque<>(READ_THREADS);
- AtomicBoolean processingFinished = new AtomicBoolean(false);
- AtomicBoolean exception = new AtomicBoolean(false);
- List<Future<?>> futures = new ArrayList<>();
- for (int i = 0; i < READ_THREADS; i++) {
+ throws ExecutionException, InterruptedException, IOException {
+ List<Future<Segment>> futures = new ArrayList<>();
+ for (SegmentArchiveEntry entry : reader.listSegments()) {
futures.add(executor.submit(() -> {
- try {
- while (!exception.get() && !(readDeque.isEmpty() && processingFinished.get())) {
- Segment segment = readDeque.poll(100, TimeUnit.MILLISECONDS);
- if (segment != null) {
- segment.read(reader);
- }
- }
- return null;
- } catch (Exception e) {
- exception.set(true);
- throw e;
- }
+ Segment segment = new Segment(entry);
+ segment.read(reader);
+ return segment;
}));
}
- futures.add(executor.submit(() -> {
- try {
- while (!exception.get() && !(writeDeque.isEmpty() && processingFinished.get())) {
- Segment segment = writeDeque.poll(100, TimeUnit.MILLISECONDS);
- if (segment != null) {
- while (segment.data == null && !exception.get()) {
- Thread.sleep(10);
- }
- segment.write(writer);
- }
- }
- return null;
- } catch (Exception e) {
- exception.set(true);
- throw e;
- }
- }));
- for (SegmentArchiveEntry entry : reader.listSegments()) {
- Segment segment = new Segment(entry);
- readDeque.putLast(segment);
- writeDeque.putLast(segment);
- }
- processingFinished.set(true);
- for (Future<?> future : futures) {
- future.get();
+
+ for (Future<Segment> future : futures) {
+ Segment segment = future.get();
+ segment.write(writer);
}
}