You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ad...@apache.org on 2020/05/14 11:11:13 UTC
svn commit: r1877731 [2/4] - in /jackrabbit/oak/trunk: ./
oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/ oak-it/
oak-it/src/test/java/org/apache/jackrabbit/oak/
oak-it/src/test/java/org/apache/jackrabbit/oak/spi/state/ oak-jcr/
oak-parent...
Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsPersistence.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsPersistence.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsPersistence.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsPersistence.java Thu May 14 11:11:12 2020
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws;
+
+import java.io.IOException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
+import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitor;
+import org.apache.jackrabbit.oak.segment.spi.persistence.GCJournalFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.ManifestFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AwsPersistence implements SegmentNodeStorePersistence {
+
+ private static final Logger log = LoggerFactory.getLogger(AwsPersistence.class);
+
+ protected final AwsContext awsContext;
+
+ private final String fileNameSuffix;
+
+ public AwsPersistence(AwsContext awsContext) {
+ this(awsContext, null);
+ }
+
+ public AwsPersistence(AwsContext awsContext, String id) {
+ if (StringUtils.isNotBlank(id)) {
+ this.awsContext = awsContext.withDirectory(id);
+ this.fileNameSuffix = "." + id;
+ } else {
+ this.awsContext = awsContext;
+ this.fileNameSuffix = "";
+ }
+ }
+
+ @Override
+ public SegmentArchiveManager createArchiveManager(boolean mmap, boolean offHeapAccess, IOMonitor ioMonitor,
+ FileStoreMonitor fileStoreMonitor, RemoteStoreMonitor remoteStoreMonitor) {
+ return new AwsArchiveManager(awsContext, ioMonitor, fileStoreMonitor);
+ }
+
+ @Override
+ public boolean segmentFilesExist() {
+ try {
+ for (String prefix : awsContext.listPrefixes()) {
+ if (prefix.indexOf(".tar/") >= 0) {
+ return true;
+ }
+ }
+
+ return false;
+ } catch (IOException e) {
+ log.error("Can't check if the segment archives exists", e);
+ return false;
+ }
+ }
+
+ @Override
+ public JournalFile getJournalFile() {
+ return new AwsJournalFile(awsContext, "journal" + fileNameSuffix + ".log");
+ }
+
+ @Override
+ public GCJournalFile getGCJournalFile() throws IOException {
+ return new AwsGCJournalFile(awsContext, "gc" + fileNameSuffix + ".log");
+ }
+
+ @Override
+ public ManifestFile getManifestFile() throws IOException {
+ return new AwsManifestFile(awsContext, "manifest");
+ }
+
+ @Override
+ public RepositoryLock lockRepository() throws IOException {
+ return new AwsRepositoryLock(awsContext, "repo" + fileNameSuffix + ".lock").lock();
+ }
+}
Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsRepositoryLock.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsRepositoryLock.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsRepositoryLock.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsRepositoryLock.java Thu May 14 11:11:12 2020
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import com.amazonaws.services.dynamodbv2.AcquireLockOptions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
+import com.amazonaws.services.dynamodbv2.LockItem;
+
+import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AwsRepositoryLock implements RepositoryLock {
+
+ private static final Logger log = LoggerFactory.getLogger(AwsContext.class);
+
+ private static final int TIMEOUT_SEC = Integer.getInteger("oak.segment.azure.lock.timeout", 0);
+
+ private static long INTERVAL = 60;
+
+ private final AmazonDynamoDBLockClient lockClient;
+ private final String lockName;
+ private final long timeoutSec;
+
+ private LockItem lockItem;
+
+ public AwsRepositoryLock(AwsContext awsContext, String lockName) {
+ this(awsContext, lockName, TIMEOUT_SEC);
+ }
+
+ public AwsRepositoryLock(AwsContext awsContext, String lockName, int timeoutSec) {
+ this.lockClient = new AmazonDynamoDBLockClient(
+ awsContext.getLockClientOptionsBuilder().withTimeUnit(TimeUnit.SECONDS).withLeaseDuration(INTERVAL)
+ .withHeartbeatPeriod(INTERVAL / 3).withCreateHeartbeatBackgroundThread(true).build());
+ this.lockName = lockName;
+ this.timeoutSec = timeoutSec;
+ }
+
+ public AwsRepositoryLock lock() throws IOException {
+ try {
+ Optional<LockItem> lockItemOptional = lockClient.tryAcquireLock(AcquireLockOptions.builder(lockName)
+ // .withTimeUnit(TimeUnit.SECONDS).withAdditionalTimeToWaitForLock(timeoutSec)
+ .build());
+ if (lockItemOptional.isPresent()) {
+ lockItem = lockItemOptional.get();
+ return this;
+ } else {
+ log.error("Can't acquire the lease in {}s.", timeoutSec);
+ throw new IOException("Can't acquire the lease in " + timeoutSec + "s.");
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void unlock() {
+ lockClient.releaseLock(lockItem);
+ }
+}
Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveEntry.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveEntry.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveEntry.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveEntry.java Thu May 14 11:11:12 2020
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws;
+
+import java.util.UUID;
+
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
+
+public class AwsSegmentArchiveEntry implements SegmentArchiveEntry {
+
+ private final long msb;
+
+ private final long lsb;
+
+ private final int position;
+
+ private final int length;
+
+ private final int generation;
+
+ private final int fullGeneration;
+
+ private final boolean compacted;
+
+ public AwsSegmentArchiveEntry(long msb, long lsb, int position, int length, int generation, int fullGeneration,
+ boolean compacted) {
+ this.msb = msb;
+ this.lsb = lsb;
+ this.position = position;
+ this.length = length;
+ this.generation = generation;
+ this.fullGeneration = fullGeneration;
+ this.compacted = compacted;
+ }
+
+ public String getFileName() {
+ return String.format("%04x.%s", getPosition(), new UUID(getMsb(), getLsb()).toString());
+ }
+
+ @Override
+ public long getMsb() {
+ return msb;
+ }
+
+ @Override
+ public long getLsb() {
+ return lsb;
+ }
+
+ public int getPosition() {
+ return position;
+ }
+
+ @Override
+ public int getLength() {
+ return length;
+ }
+
+ @Override
+ public int getGeneration() {
+ return generation;
+ }
+
+ @Override
+ public int getFullGeneration() {
+ return fullGeneration;
+ }
+
+ @Override
+ public boolean isCompacted() {
+ return compacted;
+ }
+}
Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java Thu May 14 11:11:12 2020
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws;
+
+import static java.lang.Boolean.getBoolean;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.base.Stopwatch;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
+
+public class AwsSegmentArchiveReader implements SegmentArchiveReader {
+ static final boolean OFF_HEAP = getBoolean("access.off.heap");
+
+ private final AwsContext directoryContext;
+
+ private final String archiveName;
+
+ private final IOMonitor ioMonitor;
+
+ private final long length;
+
+ private final Map<UUID, AwsSegmentArchiveEntry> index = new LinkedHashMap<>();
+
+ private Boolean hasGraph;
+
+ AwsSegmentArchiveReader(AwsContext directoryContext, String archiveName, IOMonitor ioMonitor) throws IOException {
+ this.directoryContext = directoryContext;
+ this.archiveName = archiveName;
+ this.ioMonitor = ioMonitor;
+ long length = 0;
+ for (S3ObjectSummary blob : directoryContext.listObjects("")) {
+ ObjectMetadata allMetadata = directoryContext.getObjectMetadata(blob.getKey());
+ Map<String, String> metadata = allMetadata.getUserMetadata();
+ if (AwsBlobMetadata.isSegment(metadata)) {
+ AwsSegmentArchiveEntry indexEntry = AwsBlobMetadata.toIndexEntry(metadata,
+ (int) allMetadata.getContentLength());
+ index.put(new UUID(indexEntry.getMsb(), indexEntry.getLsb()), indexEntry);
+ }
+ length += allMetadata.getContentLength();
+ }
+ this.length = length;
+ }
+
+ @Override
+ public Buffer readSegment(long msb, long lsb) throws IOException {
+ AwsSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb));
+ if (indexEntry == null) {
+ return null;
+ }
+
+ ioMonitor.beforeSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength());
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ Buffer buffer = directoryContext.readObjectToBuffer(indexEntry.getFileName(), OFF_HEAP);
+ long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
+ ioMonitor.afterSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength(), elapsed);
+ return buffer;
+ }
+
+ @Override
+ public boolean containsSegment(long msb, long lsb) {
+ return index.containsKey(new UUID(msb, lsb));
+ }
+
+ @Override
+ public List<SegmentArchiveEntry> listSegments() {
+ return new ArrayList<>(index.values());
+ }
+
+ @Override
+ public Buffer getGraph() throws IOException {
+ Buffer graph = readObjectToBuffer(getName() + ".gph");
+ hasGraph = graph != null;
+ return graph;
+ }
+
+ @Override
+ public boolean hasGraph() {
+ if (hasGraph == null) {
+ try {
+ getGraph();
+ } catch (IOException ignore) {
+ }
+ }
+ return hasGraph;
+ }
+
+ @Override
+ public Buffer getBinaryReferences() throws IOException {
+ return readObjectToBuffer(getName() + ".brf");
+ }
+
+ @Override
+ public long length() {
+ return length;
+ }
+
+ @Override
+ public String getName() {
+ return archiveName;
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+
+ @Override
+ public int getEntrySize(int size) {
+ return size;
+ }
+
+ private Buffer readObjectToBuffer(String name) throws IOException {
+ if (directoryContext.doesObjectExist(name)) {
+ return directoryContext.readObjectToBuffer(name, false);
+ }
+
+ return null;
+ }
+
+ private File pathAsFile() {
+ return new File(directoryContext.getPath());
+ }
+}
Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveWriter.java Thu May 14 11:11:12 2020
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws;
+
+import static org.apache.jackrabbit.oak.segment.aws.AwsSegmentArchiveReader.OFF_HEAP;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Stopwatch;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.aws.queue.SegmentWriteAction;
+import org.apache.jackrabbit.oak.segment.aws.queue.SegmentWriteQueue;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter;
+
+public class AwsSegmentArchiveWriter implements SegmentArchiveWriter {
+
+ private final AwsContext directoryContext;
+
+ private final String archiveName;
+
+ private final IOMonitor ioMonitor;
+
+ private final FileStoreMonitor monitor;
+
+ private final Optional<SegmentWriteQueue> queue;
+
+ private Map<UUID, AwsSegmentArchiveEntry> index = Collections.synchronizedMap(new LinkedHashMap<>());
+
+ private int entries;
+
+ private long totalLength;
+
+ private volatile boolean created = false;
+
+ public AwsSegmentArchiveWriter(AwsContext directoryContext, String archiveName, IOMonitor ioMonitor,
+ FileStoreMonitor monitor) {
+ this.directoryContext = directoryContext;
+ this.archiveName = archiveName;
+ this.ioMonitor = ioMonitor;
+ this.monitor = monitor;
+ this.queue = SegmentWriteQueue.THREADS > 0 ? Optional.of(new SegmentWriteQueue(this::doWriteEntry))
+ : Optional.empty();
+ }
+
+ @Override
+ public void writeSegment(long msb, long lsb, byte[] data, int offset, int size, int generation, int fullGeneration,
+ boolean compacted) throws IOException {
+ created = true;
+
+ AwsSegmentArchiveEntry entry = new AwsSegmentArchiveEntry(msb, lsb, entries++, size, generation, fullGeneration,
+ compacted);
+ if (queue.isPresent()) {
+ queue.get().addToQueue(entry, data, offset, size);
+ } else {
+ doWriteEntry(entry, data, offset, size);
+ }
+ index.put(new UUID(msb, lsb), entry);
+
+ totalLength += size;
+ monitor.written(size);
+ }
+
+ private void doWriteEntry(AwsSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException {
+ long msb = indexEntry.getMsb();
+ long lsb = indexEntry.getLsb();
+ String segmentName = indexEntry.getFileName();
+ String fullName = directoryContext.getPath() + segmentName;
+ ioMonitor.beforeSegmentWrite(new File(fullName), msb, lsb, size);
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ directoryContext.writeObject(segmentName, data, AwsBlobMetadata.toSegmentMetadata(indexEntry));
+ ioMonitor.afterSegmentWrite(new File(fullName), msb, lsb, size, stopwatch.elapsed(TimeUnit.NANOSECONDS));
+ }
+
+ @Override
+ public Buffer readSegment(long msb, long lsb) throws IOException {
+ UUID uuid = new UUID(msb, lsb);
+ Optional<SegmentWriteAction> segment = queue.map(q -> q.read(uuid));
+ if (segment.isPresent()) {
+ return segment.get().toBuffer();
+ }
+ AwsSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb));
+ if (indexEntry == null) {
+ return null;
+ }
+ return directoryContext.readObjectToBuffer(indexEntry.getFileName(), OFF_HEAP);
+ }
+
+ @Override
+ public boolean containsSegment(long msb, long lsb) {
+ UUID uuid = new UUID(msb, lsb);
+ Optional<SegmentWriteAction> segment = queue.map(q -> q.read(uuid));
+ if (segment.isPresent()) {
+ return true;
+ }
+ return index.containsKey(new UUID(msb, lsb));
+ }
+
+ @Override
+ public void writeGraph(byte[] data) throws IOException {
+ writeDataFile(data, ".gph");
+ }
+
+ @Override
+ public void writeBinaryReferences(byte[] data) throws IOException {
+ writeDataFile(data, ".brf");
+ }
+
+ private void writeDataFile(byte[] data, String extension) throws IOException {
+ directoryContext.writeObject(getName() + extension, data);
+ totalLength += data.length;
+ monitor.written(data.length);
+ }
+
+ @Override
+ public long getLength() {
+ return totalLength;
+ }
+
+ @Override
+ public int getEntryCount() {
+ return index.size();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (queue.isPresent()) { // required to handle IOException
+ SegmentWriteQueue q = queue.get();
+ q.flush();
+ q.close();
+ }
+ directoryContext.writeObject("closed", new byte[0]);
+ }
+
+ @Override
+ public boolean isCreated() {
+ return created || !queueIsEmpty();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (queue.isPresent()) { // required to handle IOException
+ queue.get().flush();
+ }
+ }
+
+ private boolean queueIsEmpty() {
+ return queue.map(SegmentWriteQueue::isEmpty).orElse(true);
+ }
+
+ @Override
+ public String getName() {
+ return archiveName;
+ }
+}
Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentStoreService.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentStoreService.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentStoreService.java Thu May 14 11:11:12 2020
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.ConfigurationPolicy;
+import org.osgi.service.component.annotations.Deactivate;
+
+@Component(configurationPolicy = ConfigurationPolicy.REQUIRE, configurationPid = { Configuration.PID })
+public class AwsSegmentStoreService {
+
+ public static final String DEFAULT_BUCKET_NAME = "oak";
+
+ public static final String DEFAULT_ROOT_DIRECTORY = "oak/";
+
+ public static final String DEFAULT_JOURNALTABLE_NAME = "oakjournaltable";
+
+ public static final String DEFAULT_LOCKTABLE_NAME = "oaklocktable";
+
+ public static final String DEFAULT_REGION_NAME = "us-west-2";
+
+ private ServiceRegistration registration;
+
+ private SegmentNodeStorePersistence persistence;
+
+ @Activate
+ public void activate(ComponentContext context, Configuration config) throws IOException {
+ persistence = createAwsPersistence(config);
+ registration = context.getBundleContext().registerService(SegmentNodeStorePersistence.class.getName(),
+ persistence, new Properties());
+ }
+
+ @Deactivate
+ public void deactivate() throws IOException {
+ if (registration != null) {
+ registration.unregister();
+ registration = null;
+ }
+ persistence = null;
+ }
+
+ private static SegmentNodeStorePersistence createAwsPersistence(Configuration configuration) throws IOException {
+ AwsContext awsContext = AwsContext.create(configuration);
+ AwsPersistence persistence = new AwsPersistence(awsContext);
+ return persistence;
+ }
+}
Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/Configuration.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/Configuration.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/Configuration.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/Configuration.java Thu May 14 11:11:12 2020
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws;
+
+import static org.apache.jackrabbit.oak.segment.aws.Configuration.PID;
+
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+
+@ObjectClassDefinition(
+ pid = {PID},
+ name = "Apache Jackrabbit Oak Azure Segment Store Service",
+ description = "Azure backend for the Oak Segment Node Store")
+@interface Configuration {
+
+ String PID = "org.apache.jackrabbit.oak.segment.aws.AwsSegmentStoreService";
+
+ @AttributeDefinition(
+ name = "AWS account access key",
+ description = "Access key which should be used to authenticate on the account")
+ String accessKey();
+
+ @AttributeDefinition(
+ name = "AWS bucket name",
+ description = "Name of the bucket to use. If it doesn't exists, it'll be created.")
+ String bucketName();
+
+ @AttributeDefinition(
+ name = "Root directory",
+ description = "Names of all the created blobs will be prefixed with this path")
+ String journalTableName() default AwsSegmentStoreService.DEFAULT_JOURNALTABLE_NAME;
+
+ @AttributeDefinition(
+ name = "Root directory",
+ description = "Names of all the created blobs will be prefixed with this path")
+ String lockTableName() default AwsSegmentStoreService.DEFAULT_LOCKTABLE_NAME;
+
+ @AttributeDefinition(
+ name = "AWS region",
+ description = "AWS region in which the resources are created or expected to be present")
+ String region() default AwsSegmentStoreService.DEFAULT_REGION_NAME;
+
+ @AttributeDefinition(
+ name = "Root directory",
+ description = "Names of all the created blobs will be prefixed with this path")
+ String rootDirectory() default AwsSegmentStoreService.DEFAULT_ROOT_DIRECTORY;
+
+ @AttributeDefinition(
+ name = "AWS account secret key",
+ description = "Secret key which should be used to authenticate on the account")
+ String secretKey();
+
+ @AttributeDefinition(
+ name = "AWS session token",
+ description = "Session token which should be used to authenticate on the account")
+ String sessionToken() default "";
+}
\ No newline at end of file
Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/queue/SegmentWriteAction.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/queue/SegmentWriteAction.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/queue/SegmentWriteAction.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/queue/SegmentWriteAction.java Thu May 14 11:11:12 2020
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws.queue;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.aws.AwsSegmentArchiveEntry;
+
+public class SegmentWriteAction {
+
+ private final AwsSegmentArchiveEntry indexEntry;
+
+ private final byte[] buffer;
+
+ private final int offset;
+
+ private final int length;
+
+ public SegmentWriteAction(AwsSegmentArchiveEntry indexEntry, byte[] buffer, int offset, int length) {
+ this.indexEntry = indexEntry;
+
+ this.buffer = new byte[length];
+ for (int i = 0; i < length; i++) {
+ this.buffer[i] = buffer[i + offset];
+ }
+ this.offset = 0;
+ this.length = length;
+ }
+
+ public UUID getUuid() {
+ return new UUID(indexEntry.getMsb(), indexEntry.getLsb());
+ }
+
+ public Buffer toBuffer() {
+ return Buffer.wrap(buffer, offset, length);
+ }
+
+ void passTo(SegmentWriteQueue.SegmentConsumer consumer) throws IOException {
+ consumer.consume(indexEntry, buffer, offset, length);
+ }
+
+ @Override
+ public String toString() {
+ return getUuid().toString();
+ }
+}
Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/queue/SegmentWriteQueue.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/queue/SegmentWriteQueue.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/queue/SegmentWriteQueue.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/queue/SegmentWriteQueue.java Thu May 14 11:11:12 2020
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws.queue;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.jackrabbit.oak.segment.aws.AwsSegmentArchiveEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SegmentWriteQueue implements Closeable {
+
+ public static final int THREADS = Integer.getInteger("oak.segment.aws.threads", 5);
+
+ private static final int QUEUE_SIZE = Integer.getInteger("oak.segment.aws.queue", 20);
+
+ private static final Logger log = LoggerFactory.getLogger(SegmentWriteQueue.class);
+
+ private final BlockingDeque<SegmentWriteAction> queue;
+
+ private final Map<UUID, SegmentWriteAction> segmentsByUUID;
+
+ private final ExecutorService executor;
+
+ private final ReadWriteLock flushLock;
+
+ private final SegmentConsumer writer;
+
+ private volatile boolean shutdown;
+
+ private final Object brokenMonitor = new Object();
+
+ private volatile boolean broken;
+
+ public SegmentWriteQueue(SegmentConsumer writer) {
+ this(writer, QUEUE_SIZE, THREADS);
+ }
+
+ SegmentWriteQueue(SegmentConsumer writer, int queueSize, int threadNo) {
+ this.writer = writer;
+ segmentsByUUID = new ConcurrentHashMap<>();
+ flushLock = new ReentrantReadWriteLock();
+
+ queue = new LinkedBlockingDeque<>(queueSize);
+ executor = Executors.newFixedThreadPool(threadNo + 1);
+ for (int i = 0; i < threadNo; i++) {
+ executor.submit(this::mainLoop);
+ }
+ executor.submit(this::emergencyLoop);
+ }
+
+ private void mainLoop() {
+ while (!shutdown) {
+ try {
+ waitWhileBroken();
+ if (shutdown) {
+ break;
+ }
+ consume();
+ } catch (SegmentConsumeException e) {
+ SegmentWriteAction segment = e.segment;
+ log.error("Can't persist the segment {}", segment.getUuid(), e.getCause());
+ try {
+ queue.put(segment);
+ } catch (InterruptedException e1) {
+ log.error("Can't re-add the segment {} to the queue. It'll be dropped.", segment.getUuid(), e1);
+ }
+ }
+ }
+ }
+
+ private void consume() throws SegmentConsumeException {
+ SegmentWriteAction segment = null;
+ try {
+ segment = queue.poll(100, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ log.error("Poll from queue interrupted", e);
+ }
+ if (segment != null) {
+ consume(segment);
+ }
+ }
+
+ private void consume(SegmentWriteAction segment) throws SegmentConsumeException {
+ try {
+ segment.passTo(writer);
+ } catch (IOException e) {
+ setBroken(true);
+ throw new SegmentConsumeException(segment, e);
+ }
+ synchronized (segmentsByUUID) {
+ segmentsByUUID.remove(segment.getUuid());
+ segmentsByUUID.notifyAll();
+ }
+ setBroken(false);
+ }
+
+ private void emergencyLoop() {
+ while (!shutdown) {
+ waitUntilBroken();
+ if (shutdown) {
+ break;
+ }
+
+ boolean success = false;
+ SegmentWriteAction segmentToRetry = null;
+ do {
+ try {
+ if (segmentToRetry == null) {
+ consume();
+ } else {
+ consume(segmentToRetry);
+ }
+ success = true;
+ } catch (SegmentConsumeException e) {
+ segmentToRetry = e.segment;
+ log.error("Can't persist the segment {}", segmentToRetry.getUuid(), e.getCause());
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ log.warn("Interrupted", e);
+ }
+ if (shutdown) {
+ log.error("Shutdown initiated. The segment {} will be dropped.", segmentToRetry.getUuid());
+ }
+ }
+ } while (!success && !shutdown);
+ }
+ }
+
+ public void addToQueue(AwsSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException {
+ waitWhileBroken();
+ if (shutdown) {
+ throw new IllegalStateException("Can't accept the new segment - shutdown in progress");
+ }
+
+ SegmentWriteAction action = new SegmentWriteAction(indexEntry, data, offset, size);
+ flushLock.readLock().lock();
+ try {
+ segmentsByUUID.put(action.getUuid(), action);
+ if (!queue.offer(action, 1, TimeUnit.MINUTES)) {
+ segmentsByUUID.remove(action.getUuid());
+ throw new IOException("Can't add segment to the queue");
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } finally {
+ flushLock.readLock().unlock();
+ }
+ }
+
+ public void flush() throws IOException {
+ flushLock.writeLock().lock();
+ try {
+ synchronized (segmentsByUUID) {
+ long start = System.currentTimeMillis();
+ while (!segmentsByUUID.isEmpty()) {
+ segmentsByUUID.wait(100);
+ if (System.currentTimeMillis() - start > TimeUnit.MINUTES.toMillis(1)) {
+ log.error("Can't flush the queue in 1 minute. Queue: {}. Segment map: {}", queue,
+ segmentsByUUID);
+ start = System.currentTimeMillis();
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } finally {
+ flushLock.writeLock().unlock();
+ }
+ }
+
+ public SegmentWriteAction read(UUID id) {
+ return segmentsByUUID.get(id);
+ }
+
+ public void close() throws IOException {
+ shutdown = true;
+ try {
+ executor.shutdown();
+ if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
+ throw new IOException("The write wasn't able to shut down clearly");
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public boolean isEmpty() {
+ return segmentsByUUID.isEmpty();
+ }
+
+ boolean isBroken() {
+ return broken;
+ }
+
+ int getSize() {
+ return queue.size();
+ }
+
+ private void setBroken(boolean broken) {
+ synchronized (brokenMonitor) {
+ this.broken = broken;
+ brokenMonitor.notifyAll();
+ }
+ }
+
+ private void waitWhileBroken() {
+ if (!broken) {
+ return;
+ }
+ synchronized (brokenMonitor) {
+ while (broken && !shutdown) {
+ try {
+ brokenMonitor.wait(100);
+ } catch (InterruptedException e) {
+ log.warn("Interrupted", e);
+ }
+ }
+ }
+ }
+
+ private void waitUntilBroken() {
+ if (broken) {
+ return;
+ }
+ synchronized (brokenMonitor) {
+ while (!broken && !shutdown) {
+ try {
+ brokenMonitor.wait(100);
+ } catch (InterruptedException e) {
+ log.warn("Interrupted", e);
+ }
+ }
+ }
+ }
+
+ public interface SegmentConsumer {
+ void consume(AwsSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException;
+ }
+
+ public static class SegmentConsumeException extends Exception {
+ private static final long serialVersionUID = 5778182681577974722L;
+
+ private final SegmentWriteAction segment;
+
+ public SegmentConsumeException(SegmentWriteAction segment, IOException cause) {
+ super(cause);
+ this.segment = segment;
+ }
+ }
+}
Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsCompact.java Thu May 14 11:11:12 2020
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.aws.tool;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.segment.SegmentCache.DEFAULT_SEGMENT_CACHE_MB;
+import static org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.newFileStore;
+import static org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.newSegmentNodeStorePersistence;
+import static org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.printableStopwatch;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.io.Files;
+
+import org.apache.jackrabbit.oak.segment.SegmentCache;
+import org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.SegmentStoreType;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.file.JournalReader;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.tool.Compact;
+
+/**
+ * Perform an offline compaction of an existing Azure Segment Store.
+ */
+public class AwsCompact {
+
+ /**
+ * Create a builder for the {@link Compact} command.
+ *
+ * @return an instance of {@link Builder}.
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Collect options for the {@link Compact} command.
+ */
+ public static class Builder {
+
+ private String path;
+
+ private boolean force;
+
+ private long gcLogInterval = 150000;
+
+ private int segmentCacheSize = DEFAULT_SEGMENT_CACHE_MB;
+
+ private Builder() {
+ // Prevent external instantiation.
+ }
+
+ /**
+ * The path (URI) to an existing segment store. This parameter is required.
+ *
+ * @param path the path to an existing segment store.
+ * @return this builder.
+ */
+ public Builder withPath(String path) {
+ this.path = checkNotNull(path);
+ return this;
+ }
+
+ /**
+ * Whether to fail if run on an older version of the store of force upgrading
+ * its format.
+ *
+ * @param force upgrade iff {@code true}
+ * @return this builder.
+ */
+ public Builder withForce(boolean force) {
+ this.force = force;
+ return this;
+ }
+
+ /**
+ * The size of the segment cache in MB. The default of
+ * {@link SegmentCache#DEFAULT_SEGMENT_CACHE_MB} when this method is not
+ * invoked.
+ *
+ * @param segmentCacheSize cache size in MB
+ * @return this builder
+ * @throws IllegalArgumentException if {@code segmentCacheSize} is not a
+ * positive integer.
+ */
+ public Builder withSegmentCacheSize(int segmentCacheSize) {
+ checkArgument(segmentCacheSize > 0, "segmentCacheSize must be strictly positive");
+ this.segmentCacheSize = segmentCacheSize;
+ return this;
+ }
+
+ /**
+ * The number of nodes after which an update about the compaction process is
+ * logged. Set to a negative number to disable progress logging. If not
+ * specified, it defaults to 150,000 nodes.
+ *
+ * @param gcLogInterval The log interval.
+ * @return this builder.
+ */
+ public Builder withGCLogInterval(long gcLogInterval) {
+ this.gcLogInterval = gcLogInterval;
+ return this;
+ }
+
+ /**
+ * Create an executable version of the {@link Compact} command.
+ *
+ * @return an instance of {@link Runnable}.
+ */
+ public AwsCompact build() {
+ checkNotNull(path);
+ return new AwsCompact(this);
+ }
+ }
+
+ private final String path;
+
+ private final int segmentCacheSize;
+
+ private final boolean strictVersionCheck;
+
+ private final long gcLogInterval;
+
+ private AwsCompact(Builder builder) {
+ this.path = builder.path;
+ this.segmentCacheSize = builder.segmentCacheSize;
+ this.strictVersionCheck = !builder.force;
+ this.gcLogInterval = builder.gcLogInterval;
+ }
+
+ public int run() throws IOException {
+ Stopwatch watch = Stopwatch.createStarted();
+ SegmentNodeStorePersistence persistence = newSegmentNodeStorePersistence(SegmentStoreType.AWS, path);
+ SegmentArchiveManager archiveManager = persistence.createArchiveManager(false, false, new IOMonitorAdapter(),
+ new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
+
+ System.out.printf("Compacting %s\n", path);
+ System.out.printf(" before\n");
+ List<String> beforeArchives = Collections.emptyList();
+ try {
+ beforeArchives = archiveManager.listArchives();
+ } catch (IOException e) {
+ System.err.println(e);
+ }
+
+ printArchives(System.out, beforeArchives);
+ System.out.printf(" -> compacting\n");
+
+ try (FileStore store = newFileStore(persistence, Files.createTempDir(), strictVersionCheck, segmentCacheSize,
+ gcLogInterval)) {
+ if (!store.compactFull()) {
+ System.out.printf("Compaction cancelled after %s.\n", printableStopwatch(watch));
+ return 1;
+ }
+ System.out.printf(" -> cleaning up\n");
+ store.cleanup();
+ JournalFile journal = persistence.getJournalFile();
+ String head;
+ try (JournalReader journalReader = new JournalReader(journal)) {
+ head = String.format("%s root %s\n", journalReader.next().getRevision(), System.currentTimeMillis());
+ }
+
+ try (JournalFileWriter journalWriter = journal.openJournalWriter()) {
+ System.out.printf(" -> writing new %s: %s\n", journal.getName(), head);
+ journalWriter.truncate();
+ journalWriter.writeLine(head);
+ }
+ } catch (Exception e) {
+ watch.stop();
+ e.printStackTrace(System.err);
+ System.out.printf("Compaction failed after %s.\n", printableStopwatch(watch));
+ return 1;
+ }
+
+ watch.stop();
+ System.out.printf(" after\n");
+ List<String> afterArchives = Collections.emptyList();
+ try {
+ afterArchives = archiveManager.listArchives();
+ } catch (IOException e) {
+ System.err.println(e);
+ }
+ printArchives(System.out, afterArchives);
+ System.out.printf("Compaction succeeded in %s.\n", printableStopwatch(watch));
+ return 0;
+ }
+
+ private static void printArchives(PrintStream s, List<String> archives) {
+ for (String a : archives) {
+ s.printf(" %s\n", a);
+ }
+ }
+}
Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsSegmentCopy.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsSegmentCopy.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsSegmentCopy.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsSegmentCopy.java Thu May 14 11:11:12 2020
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws.tool;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.newSegmentNodeStorePersistence;
+import static org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.printMessage;
+import static org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.printableStopwatch;
+import static org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.storeDescription;
+import static org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.storeTypeFromPathOrUri;
+
+import java.io.PrintWriter;
+
+import com.google.common.base.Stopwatch;
+
+import org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.SegmentStoreType;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.tool.Check;
+
+/**
+ * Perform a full-copy of repository data at segment level.
+ */
+public class AwsSegmentCopy {
+ /**
+ * Create a builder for the {@link SegmentCopy} command.
+ *
+ * @return an instance of {@link Builder}.
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static boolean canExecute(String source, String destination) {
+ return source.startsWith("aws:") || destination.startsWith("aws:");
+ }
+
+ /**
+ * Collect options for the {@link SegmentCopy} command.
+ */
+ public static class Builder {
+
+ private String source;
+
+ private String destination;
+
+ private SegmentNodeStorePersistence srcPersistence;
+
+ private SegmentNodeStorePersistence destPersistence;
+
+ private PrintWriter outWriter;
+
+ private PrintWriter errWriter;
+
+ private Builder() {
+ // Prevent external instantiation.
+ }
+
+ /**
+ * The source path/URI to an existing segment store. This parameter is required.
+ *
+ * @param source the source path/URI to an existing segment store.
+ * @return this builder.
+ */
+ public Builder withSource(String source) {
+ this.source = checkNotNull(source);
+ return this;
+ }
+
+ /**
+ * The destination path/URI to an existing segment store. This parameter is
+ * required.
+ *
+ * @param destination the destination path/URI to an existing segment store.
+ * @return this builder.
+ */
+ public Builder withDestination(String destination) {
+ this.destination = checkNotNull(destination);
+ return this;
+ }
+
+ /**
+ * The destination {@link SegmentNodeStorePersistence}.
+ *
+ * @param srcPersistence the destination {@link SegmentNodeStorePersistence}.
+ * @return this builder.
+ */
+ public Builder withSrcPersistencee(SegmentNodeStorePersistence srcPersistence) {
+ this.srcPersistence = checkNotNull(srcPersistence);
+ return this;
+ }
+
+ /**
+ * The destination {@link SegmentNodeStorePersistence}.
+ *
+ * @param destPersistence the destination {@link SegmentNodeStorePersistence}.
+ * @return this builder.
+ */
+ public Builder withDestPersistence(SegmentNodeStorePersistence destPersistence) {
+ this.destPersistence = checkNotNull(destPersistence);
+ return this;
+ }
+
+ /**
+ * The text output stream writer used to print normal output.
+ *
+ * @param outWriter the output writer.
+ * @return this builder.
+ */
+ public Builder withOutWriter(PrintWriter outWriter) {
+ this.outWriter = outWriter;
+ return this;
+ }
+
+ /**
+ * The text error stream writer used to print erroneous output.
+ *
+ * @param errWriter the error writer.
+ * @return this builder.
+ */
+ public Builder withErrWriter(PrintWriter errWriter) {
+ this.errWriter = errWriter;
+ return this;
+ }
+
+ /**
+ * Create an executable version of the {@link Check} command.
+ *
+ * @return an instance of {@link Runnable}.
+ */
+ public AwsSegmentCopy build() {
+ if (srcPersistence == null && destPersistence == null) {
+ checkNotNull(source);
+ checkNotNull(destination);
+ }
+ return new AwsSegmentCopy(this);
+ }
+ }
+
+ private final String source;
+
+ private final String destination;
+
+ private final PrintWriter outWriter;
+
+ private final PrintWriter errWriter;
+
+ private SegmentNodeStorePersistence srcPersistence;
+
+ private SegmentNodeStorePersistence destPersistence;
+
+ public AwsSegmentCopy(Builder builder) {
+ this.source = builder.source;
+ this.destination = builder.destination;
+ this.srcPersistence = builder.srcPersistence;
+ this.destPersistence = builder.destPersistence;
+ this.outWriter = builder.outWriter;
+ this.errWriter = builder.errWriter;
+ }
+
+ public int run() {
+ Stopwatch watch = Stopwatch.createStarted();
+
+ SegmentStoreType srcType = storeTypeFromPathOrUri(source);
+ SegmentStoreType destType = storeTypeFromPathOrUri(destination);
+
+ String srcDescription = storeDescription(srcType, source);
+ String destDescription = storeDescription(destType, destination);
+
+ try {
+ if (srcPersistence == null || destPersistence == null) {
+ srcPersistence = newSegmentNodeStorePersistence(srcType, source);
+ destPersistence = newSegmentNodeStorePersistence(destType, destination);
+ }
+
+ printMessage(outWriter, "Started segment-copy transfer!");
+ printMessage(outWriter, "Source: {0}", srcDescription);
+ printMessage(outWriter, "Destination: {0}", destDescription);
+
+ AwsSegmentStoreMigrator migrator = new AwsSegmentStoreMigrator.Builder()
+ .withSourcePersistence(srcPersistence, srcDescription)
+ .withTargetPersistence(destPersistence, destDescription).build();
+
+ migrator.migrate();
+ } catch (Exception e) {
+ watch.stop();
+ printMessage(errWriter, "A problem occured while copying archives from {0} to {1} ", source, destination);
+ e.printStackTrace(errWriter);
+ return 1;
+ }
+
+ watch.stop();
+ printMessage(outWriter, "Segment-copy succeeded in {0}", printableStopwatch(watch));
+
+ return 0;
+ }
+}
\ No newline at end of file
Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsSegmentStoreMigrator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsSegmentStoreMigrator.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsSegmentStoreMigrator.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsSegmentStoreMigrator.java Thu May 14 11:11:12 2020
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws.tool;
+
+import static org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.fetchByteArray;
+import static org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.storeDescription;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.aws.AwsContext;
+import org.apache.jackrabbit.oak.segment.aws.AwsPersistence;
+import org.apache.jackrabbit.oak.segment.aws.tool.AwsToolUtils.SegmentStoreType;
+import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence;
+import org.apache.jackrabbit.oak.segment.spi.RepositoryNotReachableException;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.persistence.GCJournalFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AwsSegmentStoreMigrator implements Closeable {
+
+ private static final Logger log = LoggerFactory.getLogger(AwsSegmentStoreMigrator.class);
+
+ private static final int READ_THREADS = 20;
+
+ private final SegmentNodeStorePersistence source;
+
+ private final SegmentNodeStorePersistence target;
+
+ private final String sourceName;
+
+ private final String targetName;
+
+ private final boolean appendMode;
+
+ private final boolean onlyLastJournalEntry;
+
+ private ExecutorService executor = Executors.newFixedThreadPool(READ_THREADS + 1);
+
+ private AwsSegmentStoreMigrator(Builder builder) {
+ this.source = builder.source;
+ this.target = builder.target;
+ this.sourceName = builder.sourceName;
+ this.targetName = builder.targetName;
+ this.appendMode = builder.appendMode;
+ this.onlyLastJournalEntry = builder.onlyLastJournalEntry;
+ }
+
+ public void migrate() throws IOException, ExecutionException, InterruptedException {
+ runWithRetry(() -> migrateJournal(), 16, 5);
+ runWithRetry(() -> migrateGCJournal(), 16, 5);
+ runWithRetry(() -> migrateManifest(), 16, 5);
+ migrateArchives();
+ }
+
+ private Void migrateJournal() throws IOException {
+ log.info("{}/journal.log -> {}", sourceName, targetName);
+ if (!source.getJournalFile().exists()) {
+ log.info("No journal at {}; skipping.", sourceName);
+ return null;
+ }
+ List<String> journal = new ArrayList<>();
+
+ try (JournalFileReader reader = source.getJournalFile().openJournalReader()) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ if (line.length() > 0 && !line.trim().equals("")) {
+ journal.add(line);
+ }
+ if (!journal.isEmpty() && onlyLastJournalEntry) {
+ break;
+ }
+ }
+ }
+ Collections.reverse(journal);
+
+ try (JournalFileWriter writer = target.getJournalFile().openJournalWriter()) {
+ writer.truncate();
+ for (String line : journal) {
+ writer.writeLine(line);
+ }
+ }
+ return null;
+ }
+
+ private Void migrateGCJournal() throws IOException {
+ log.info("{}/gc.log -> {}", sourceName, targetName);
+ GCJournalFile targetGCJournal = target.getGCJournalFile();
+ if (appendMode) {
+ targetGCJournal.truncate();
+ }
+ for (String line : source.getGCJournalFile().readLines()) {
+ targetGCJournal.writeLine(line);
+ }
+ return null;
+ }
+
+ private Void migrateManifest() throws IOException {
+ log.info("{}/manifest -> {}", sourceName, targetName);
+ if (!source.getManifestFile().exists()) {
+ log.info("No manifest at {}; skipping.", sourceName);
+ return null;
+ }
+ Properties manifest = source.getManifestFile().load();
+ target.getManifestFile().save(manifest);
+ return null;
+ }
+
+ private void migrateArchives() throws IOException, ExecutionException, InterruptedException {
+ if (!source.segmentFilesExist()) {
+ log.info("No segment archives at {}; skipping.", sourceName);
+ return;
+ }
+ SegmentArchiveManager sourceManager = source.createArchiveManager(false, false, new IOMonitorAdapter(),
+ new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
+ SegmentArchiveManager targetManager = target.createArchiveManager(false, false, new IOMonitorAdapter(),
+ new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
+ List<String> targetArchives = targetManager.listArchives();
+ for (String archiveName : sourceManager.listArchives()) {
+ log.info("{}/{} -> {}", sourceName, archiveName, targetName);
+ if (appendMode && targetArchives.contains(archiveName)) {
+ log.info("Already exists, skipping.");
+ continue;
+ }
+ try (SegmentArchiveReader reader = sourceManager.forceOpen(archiveName)) {
+ SegmentArchiveWriter writer = targetManager.create(archiveName);
+ try {
+ migrateSegments(reader, writer);
+ migrateBinaryRef(reader, writer);
+ migrateGraph(reader, writer);
+ } catch (Exception e) {
+ log.error("Can't write archive", e);
+ throw e;
+ } finally {
+ writer.close();
+ }
+ }
+ }
+ }
+
+ private void migrateSegments(SegmentArchiveReader reader, SegmentArchiveWriter writer) throws ExecutionException, InterruptedException, IOException {
+ List<Future<Segment>> futures = new ArrayList<>();
+ for (SegmentArchiveEntry entry : reader.listSegments()) {
+ futures.add(executor.submit(() -> runWithRetry(() -> {
+ Segment segment = new Segment(entry);
+ segment.read(reader);
+ return segment;
+ }, 16, 5)));
+ }
+
+ for (Future<Segment> future : futures) {
+ Segment segment = future.get();
+ segment.write(writer);
+ }
+ }
+
+ private void migrateBinaryRef(SegmentArchiveReader reader, SegmentArchiveWriter writer) throws IOException {
+ Buffer binaryReferences = reader.getBinaryReferences();
+ if (binaryReferences != null) {
+ byte[] array = fetchByteArray(binaryReferences);
+ writer.writeBinaryReferences(array);
+ }
+ }
+
+ private void migrateGraph(SegmentArchiveReader reader, SegmentArchiveWriter writer) throws IOException {
+ if (reader.hasGraph()) {
+ Buffer graph = reader.getGraph();
+ byte[] array = fetchByteArray(graph);
+ writer.writeGraph(array);
+ }
+ }
+
+ private static <T> T runWithRetry(Producer<T> producer, int maxAttempts, int intervalSec) throws IOException {
+ IOException ioException = null;
+ RepositoryNotReachableException repoNotReachableException = null;
+ for (int i = 0; i < maxAttempts; i++) {
+ try {
+ return producer.produce();
+ } catch (IOException e) {
+ log.error("Can't execute the operation. Retrying (attempt {})", i, e);
+ ioException = e;
+ } catch (RepositoryNotReachableException e) {
+ log.error("Can't execute the operation. Retrying (attempt {})", i, e);
+ repoNotReachableException = e;
+ }
+ try {
+ Thread.sleep(intervalSec * 1000);
+ } catch (InterruptedException e) {
+ log.error("Interrupted", e);
+ }
+ }
+ if (ioException != null) {
+ throw ioException;
+ } else if (repoNotReachableException != null) {
+ throw repoNotReachableException;
+ } else {
+ throw new IllegalStateException();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ executor.shutdown();
+ try {
+ while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS));
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @FunctionalInterface
+ private interface Producer<T> {
+ T produce() throws IOException;
+ }
+
+ private static class Segment {
+
+ private final SegmentArchiveEntry entry;
+
+ private volatile Buffer data;
+
+ private Segment(SegmentArchiveEntry entry) {
+ this.entry = entry;
+ }
+
+ private void read(SegmentArchiveReader reader) throws IOException {
+ data = reader.readSegment(entry.getMsb(), entry.getLsb());
+ }
+
+ private void write(SegmentArchiveWriter writer) throws IOException {
+ final byte[] array = data.array();
+ final int offset = 0;
+ writer.writeSegment(entry.getMsb(), entry.getLsb(), array, offset, entry.getLength(), entry.getGeneration(),
+ entry.getFullGeneration(), entry.isCompacted());
+ }
+
+ @Override
+ public String toString() {
+ return new UUID(entry.getMsb(), entry.getLsb()).toString();
+ }
+ }
+
+ public static class Builder {
+
+ private SegmentNodeStorePersistence source;
+
+ private SegmentNodeStorePersistence target;
+
+ private String sourceName;
+
+ private String targetName;
+
+ private boolean appendMode;
+
+ private boolean onlyLastJournalEntry;
+
+ public Builder withSource(File dir) {
+ this.source = new TarPersistence(dir);
+ this.sourceName = storeDescription(SegmentStoreType.TAR, dir.getPath());
+ return this;
+ }
+
+ public Builder withSource(AwsContext awsContext) {
+ this.source = new AwsPersistence(awsContext);
+ this.sourceName = storeDescription(SegmentStoreType.AWS, awsContext.getConfig());
+ return this;
+ }
+
+ public Builder withSourcePersistence(SegmentNodeStorePersistence source, String sourceName) {
+ this.source = source;
+ this.sourceName = sourceName;
+ return this;
+ }
+
+ public Builder withTargetPersistence(SegmentNodeStorePersistence target, String targetName) {
+ this.target = target;
+ this.targetName = targetName;
+ return this;
+ }
+
+ public Builder withTarget(File dir) {
+ this.target = new TarPersistence(dir);
+ this.targetName = storeDescription(SegmentStoreType.TAR, dir.getPath());
+ return this;
+ }
+
+ public Builder withTarget(AwsContext awsContext) {
+ this.target = new AwsPersistence(awsContext);
+ this.targetName = storeDescription(SegmentStoreType.AWS, awsContext.getConfig());
+ return this;
+ }
+
+ public Builder setAppendMode() {
+ this.appendMode = true;
+ return this;
+ }
+
+ public Builder withOnlyLastJournalEntry() {
+ this.onlyLastJournalEntry = true;
+ return this;
+ }
+
+ public AwsSegmentStoreMigrator build() {
+ return new AwsSegmentStoreMigrator(this);
+ }
+ }
+}
\ No newline at end of file
Added: jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsToolUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsToolUtils.java?rev=1877731&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsToolUtils.java (added)
+++ jackrabbit/oak/trunk/oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/tool/AwsToolUtils.java Thu May 14 11:11:12 2020
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.segment.aws.tool;
+
+import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.defaultGCOptions;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URISyntaxException;
+import java.text.MessageFormat;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Stopwatch;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.aws.AwsContext;
+import org.apache.jackrabbit.oak.segment.aws.AwsPersistence;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder;
+import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
+import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+
+/**
+ * Utility class for common stuff pertaining to tooling.
+ */
+public class AwsToolUtils {
+
+ private AwsToolUtils() {
+ // prevent instantiation
+ }
+
+ public enum SegmentStoreType {
+ TAR("TarMK Segment Store"), AWS("AWS Segment Store");
+
+ private String type;
+
+ SegmentStoreType(String type) {
+ this.type = type;
+ }
+
+ public String description(String pathOrUri) {
+ String location = pathOrUri;
+ if (pathOrUri.startsWith("aws:")) {
+ location = pathOrUri.substring(3);
+ }
+
+ return type + "@" + location;
+ }
+ }
+
+ public static FileStore newFileStore(SegmentNodeStorePersistence persistence, File directory,
+ boolean strictVersionCheck, int segmentCacheSize, long gcLogInterval)
+ throws IOException, InvalidFileStoreVersionException, URISyntaxException {
+ FileStoreBuilder builder = FileStoreBuilder.fileStoreBuilder(directory)
+ .withCustomPersistence(persistence)
+ .withMemoryMapping(false)
+ .withStrictVersionCheck(strictVersionCheck)
+ .withSegmentCacheSize(segmentCacheSize)
+ .withGCOptions(defaultGCOptions().setOffline().setGCLogInterval(gcLogInterval));
+
+ return builder.build();
+ }
+
+ public static SegmentNodeStorePersistence newSegmentNodeStorePersistence(SegmentStoreType storeType,
+ String pathOrUri) throws IOException {
+ SegmentNodeStorePersistence persistence = null;
+
+ switch (storeType) {
+ case AWS:
+ String[] parts = pathOrUri.substring(4).split(";");
+ AwsContext awsContext = AwsContext.create(parts[0], parts[1], parts[2], parts[3]);
+ persistence = new AwsPersistence(awsContext);
+ break;
+ default:
+ persistence = new TarPersistence(new File(pathOrUri));
+ }
+
+ return persistence;
+ }
+
+ public static SegmentStoreType storeTypeFromPathOrUri(String pathOrUri) {
+ if (pathOrUri.startsWith("aws:")) {
+ return SegmentStoreType.AWS;
+ }
+
+ return SegmentStoreType.TAR;
+ }
+
+ public static String storeDescription(SegmentStoreType storeType, String pathOrUri) {
+ return storeType.description(pathOrUri);
+ }
+
+ public static String printableStopwatch(Stopwatch s) {
+ return String.format("%s (%ds)", s, s.elapsed(TimeUnit.SECONDS));
+ }
+
+ public static void printMessage(PrintWriter pw, String format, Object... arg) {
+ pw.println(MessageFormat.format(format, arg));
+ }
+
+ public static byte[] fetchByteArray(Buffer buffer) throws IOException {
+ byte[] data = new byte[buffer.remaining()];
+ buffer.get(data);
+ return data;
+ }
+}