You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by to...@apache.org on 2019/09/30 08:31:50 UTC

svn commit: r1867754 - /jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java

Author: tomekr
Date: Mon Sep 30 08:31:50 2019
New Revision: 1867754

URL: http://svn.apache.org/viewvc?rev=1867754&view=rev
Log:
OAK-8654: Introduce auto-retry in the SegmentStoreMigrator

Modified:
    jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java

Modified: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java?rev=1867754&r1=1867753&r2=1867754&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java Mon Sep 30 08:31:50 2019
@@ -27,6 +27,7 @@ import org.apache.jackrabbit.oak.commons
 import org.apache.jackrabbit.oak.segment.azure.AzurePersistence;
 import org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.SegmentStoreType;
 import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence;
+import org.apache.jackrabbit.oak.segment.spi.RepositoryNotReachableException;
 import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
 import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
 import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitorAdapter;
@@ -54,6 +55,7 @@ import java.util.concurrent.ExecutionExc
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 public class SegmentStoreMigrator implements Closeable  {
 
@@ -85,17 +87,17 @@ public class SegmentStoreMigrator implem
     }
 
     public void migrate() throws IOException, ExecutionException, InterruptedException {
-        migrateJournal();
-        migrateGCJournal();
-        migrateManifest();
+        runWithRetry(() -> migrateJournal(), 16, 5);
+        runWithRetry(() -> migrateGCJournal(), 16, 5);
+        runWithRetry(() -> migrateManifest(), 16, 5);
         migrateArchives();
     }
 
-    private void migrateJournal() throws IOException {
+    private Void migrateJournal() throws IOException {
         log.info("{}/journal.log -> {}", sourceName, targetName);
         if (!source.getJournalFile().exists()) {
             log.info("No journal at {}; skipping.", sourceName);
-            return;
+            return null;
         }
         List<String> journal = new ArrayList<>();
 
@@ -118,9 +120,10 @@ public class SegmentStoreMigrator implem
                 writer.writeLine(line);
             }
         }
+        return null;
     }
 
-    private void migrateGCJournal() throws IOException {
+    private Void migrateGCJournal() throws IOException {
         log.info("{}/gc.log -> {}", sourceName, targetName);
         GCJournalFile targetGCJournal = target.getGCJournalFile();
         if (appendMode) {
@@ -129,16 +132,18 @@ public class SegmentStoreMigrator implem
         for (String line : source.getGCJournalFile().readLines()) {
             targetGCJournal.writeLine(line);
         }
+        return null;
     }
 
-    private void migrateManifest() throws IOException {
+    private Void migrateManifest() throws IOException {
         log.info("{}/manifest -> {}", sourceName, targetName);
         if (!source.getManifestFile().exists()) {
             log.info("No manifest at {}; skipping.", sourceName);
-            return;
+            return null;
         }
         Properties manifest = source.getManifestFile().load();
         target.getManifestFile().save(manifest);
+        return null;
     }
 
     private void migrateArchives() throws IOException, ExecutionException, InterruptedException {
@@ -170,15 +175,14 @@ public class SegmentStoreMigrator implem
         }
     }
 
-    private void migrateSegments(SegmentArchiveReader reader, SegmentArchiveWriter writer)
-            throws ExecutionException, InterruptedException, IOException {
+    private void migrateSegments(SegmentArchiveReader reader, SegmentArchiveWriter writer) throws ExecutionException, InterruptedException, IOException {
         List<Future<Segment>> futures = new ArrayList<>();
         for (SegmentArchiveEntry entry : reader.listSegments()) {
-            futures.add(executor.submit(() -> {
+            futures.add(executor.submit(() -> runWithRetry(() -> {
                 Segment segment = new Segment(entry);
                 segment.read(reader);
                 return segment;
-            }));
+            }, 16, 5)));
         }
 
         for (Future<Segment> future : futures) {
@@ -203,9 +207,47 @@ public class SegmentStoreMigrator implem
         }
     }
 
+    private static <T> T runWithRetry(Producer<T> producer, int maxAttempts, int intervalSec) throws IOException {
+        IOException ioException = null;
+        RepositoryNotReachableException repoNotReachableException = null;
+        for (int i = 0; i < maxAttempts; i++) {
+            try {
+                return producer.produce();
+            } catch (IOException e) {
+                log.error("Can't execute the operation. Retrying (attempt {})", i, e);
+                ioException = e;
+            } catch (RepositoryNotReachableException e) {
+                log.error("Can't execute the operation. Retrying (attempt {})", i, e);
+                repoNotReachableException = e;
+            }
+            try {
+                Thread.sleep(intervalSec * 1000);
+            } catch (InterruptedException e) {
+                log.error("Interrupted", e);
+            }
+        }
+        if (ioException != null) {
+            throw ioException;
+        } else if (repoNotReachableException != null) {
+            throw repoNotReachableException;
+        } else {
+            throw new IllegalStateException();
+        }
+    }
+
     @Override
     public void close() throws IOException {
         executor.shutdown();
+        try {
+            while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS));
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @FunctionalInterface
+    private interface Producer<T> {
+        T produce() throws IOException;
     }
 
     private static class Segment {