You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by to...@apache.org on 2019/09/30 08:31:50 UTC
svn commit: r1867754 -
/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java
Author: tomekr
Date: Mon Sep 30 08:31:50 2019
New Revision: 1867754
URL: http://svn.apache.org/viewvc?rev=1867754&view=rev
Log:
OAK-8654: Introduce auto-retry in the SegmentStoreMigrator
Modified:
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java
Modified: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java?rev=1867754&r1=1867753&r2=1867754&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java (original)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java Mon Sep 30 08:31:50 2019
@@ -27,6 +27,7 @@ import org.apache.jackrabbit.oak.commons
import org.apache.jackrabbit.oak.segment.azure.AzurePersistence;
import org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.SegmentStoreType;
import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence;
+import org.apache.jackrabbit.oak.segment.spi.RepositoryNotReachableException;
import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitorAdapter;
@@ -54,6 +55,7 @@ import java.util.concurrent.ExecutionExc
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
public class SegmentStoreMigrator implements Closeable {
@@ -85,17 +87,17 @@ public class SegmentStoreMigrator implem
}
public void migrate() throws IOException, ExecutionException, InterruptedException {
- migrateJournal();
- migrateGCJournal();
- migrateManifest();
+ runWithRetry(() -> migrateJournal(), 16, 5);
+ runWithRetry(() -> migrateGCJournal(), 16, 5);
+ runWithRetry(() -> migrateManifest(), 16, 5);
migrateArchives();
}
- private void migrateJournal() throws IOException {
+ private Void migrateJournal() throws IOException {
log.info("{}/journal.log -> {}", sourceName, targetName);
if (!source.getJournalFile().exists()) {
log.info("No journal at {}; skipping.", sourceName);
- return;
+ return null;
}
List<String> journal = new ArrayList<>();
@@ -118,9 +120,10 @@ public class SegmentStoreMigrator implem
writer.writeLine(line);
}
}
+ return null;
}
- private void migrateGCJournal() throws IOException {
+ private Void migrateGCJournal() throws IOException {
log.info("{}/gc.log -> {}", sourceName, targetName);
GCJournalFile targetGCJournal = target.getGCJournalFile();
if (appendMode) {
@@ -129,16 +132,18 @@ public class SegmentStoreMigrator implem
for (String line : source.getGCJournalFile().readLines()) {
targetGCJournal.writeLine(line);
}
+ return null;
}
- private void migrateManifest() throws IOException {
+ private Void migrateManifest() throws IOException {
log.info("{}/manifest -> {}", sourceName, targetName);
if (!source.getManifestFile().exists()) {
log.info("No manifest at {}; skipping.", sourceName);
- return;
+ return null;
}
Properties manifest = source.getManifestFile().load();
target.getManifestFile().save(manifest);
+ return null;
}
private void migrateArchives() throws IOException, ExecutionException, InterruptedException {
@@ -170,15 +175,14 @@ public class SegmentStoreMigrator implem
}
}
- private void migrateSegments(SegmentArchiveReader reader, SegmentArchiveWriter writer)
- throws ExecutionException, InterruptedException, IOException {
+ private void migrateSegments(SegmentArchiveReader reader, SegmentArchiveWriter writer) throws ExecutionException, InterruptedException, IOException {
List<Future<Segment>> futures = new ArrayList<>();
for (SegmentArchiveEntry entry : reader.listSegments()) {
- futures.add(executor.submit(() -> {
+ futures.add(executor.submit(() -> runWithRetry(() -> {
Segment segment = new Segment(entry);
segment.read(reader);
return segment;
- }));
+ }, 16, 5)));
}
for (Future<Segment> future : futures) {
@@ -203,9 +207,47 @@ public class SegmentStoreMigrator implem
}
}
+ private static <T> T runWithRetry(Producer<T> producer, int maxAttempts, int intervalSec) throws IOException {
+ IOException ioException = null;
+ RepositoryNotReachableException repoNotReachableException = null;
+ for (int i = 0; i < maxAttempts; i++) {
+ try {
+ return producer.produce();
+ } catch (IOException e) {
+ log.error("Can't execute the operation. Retrying (attempt {})", i, e);
+ ioException = e;
+ } catch (RepositoryNotReachableException e) {
+ log.error("Can't execute the operation. Retrying (attempt {})", i, e);
+ repoNotReachableException = e;
+ }
+ try {
+ Thread.sleep(intervalSec * 1000);
+ } catch (InterruptedException e) {
+ log.error("Interrupted", e);
+ }
+ }
+ if (ioException != null) {
+ throw ioException;
+ } else if (repoNotReachableException != null) {
+ throw repoNotReachableException;
+ } else {
+ throw new IllegalStateException();
+ }
+ }
+
@Override
public void close() throws IOException {
executor.shutdown();
+ try {
+ while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS));
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @FunctionalInterface
+ private interface Producer<T> {
+ T produce() throws IOException;
}
private static class Segment {