You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by to...@apache.org on 2018/03/20 10:54:10 UTC
svn commit: r1827292 [1/2] - in /jackrabbit/oak/trunk: ./ oak-segment-azure/
oak-segment-azure/src/ oak-segment-azure/src/main/
oak-segment-azure/src/main/java/ oak-segment-azure/src/main/java/org/
oak-segment-azure/src/main/java/org/apache/ oak-segmen...
Author: tomekr
Date: Tue Mar 20 10:54:09 2018
New Revision: 1827292
URL: http://svn.apache.org/viewvc?rev=1827292&view=rev
Log:
OAK-6922: Azure support for the segment-tar
Added:
jackrabbit/oak/trunk/oak-segment-azure/
jackrabbit/oak/trunk/oak-segment-azure/pom.xml
jackrabbit/oak/trunk/oak-segment-azure/src/
jackrabbit/oak/trunk/oak-segment-azure/src/main/
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalFile.java
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFile.java
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentStoreService.java
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/Configuration.java
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ReverseFileReader.java
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java
jackrabbit/oak/trunk/oak-segment-azure/src/test/
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalTest.java
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFileTest.java
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/SegmentAzureFixture.java
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureTarRevisionsTest.java
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/ReverseFileReaderTest.java
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java
jackrabbit/oak/trunk/oak-segment-azure/start-azurite.sh (with props)
Modified:
jackrabbit/oak/trunk/pom.xml
Added: jackrabbit/oak/trunk/oak-segment-azure/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/pom.xml?rev=1827292&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/pom.xml (added)
+++ jackrabbit/oak/trunk/oak-segment-azure/pom.xml Tue Mar 20 10:54:09 2018
@@ -0,0 +1,160 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-parent</artifactId>
+ <version>1.10-SNAPSHOT</version>
+ <relativePath>../oak-parent/pom.xml</relativePath>
+ </parent>
+
+ <artifactId>oak-segment-azure</artifactId>
+ <packaging>bundle</packaging>
+
+ <name>Oak Segment Azure</name>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <configuration>
+ <instructions>
+ <Export-Package></Export-Package>
+ <Embed-Dependency>
+ azure-storage,
+ azure-keyvault-core
+ </Embed-Dependency>
+ </instructions>
+ </configuration>
+ <executions>
+ <execution>
+ <id>baseline</id>
+ <goals>
+ <goal>baseline</goal>
+ </goals>
+ <phase>pre-integration-test</phase>
+ <configuration>
+ <!--
+ This is required because there's no prior (stable) version of oak-segment-azure
+ This should be removed post first release
+ -->
+ <skip>true</skip>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <!-- ====================================================================== -->
+ <!-- D E P E N D E N C I E S -->
+ <!-- ====================================================================== -->
+ <dependencies>
+ <!-- Optional OSGi dependencies, used only when running within OSGi -->
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.annotation</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.service.component.annotations</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.service.metatype.annotations</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Dependencies to other Oak components -->
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-segment-tar</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-store-spi</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Azure Blob Storage dependency -->
+ <dependency>
+ <groupId>com.microsoft.azure</groupId>
+ <artifactId>azure-storage</artifactId>
+ <version>5.0.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.microsoft.azure</groupId>
+ <artifactId>azure-keyvault-core</artifactId>
+ <version>0.9.7</version>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-segment-tar</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-store-spi</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>1.10.19</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.arakelian</groupId>
+ <artifactId>docker-junit-rule</artifactId>
+ <version>2.1.0</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java?rev=1827292&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java (added)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java Tue Mar 20 10:54:09 2018
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.BlobListingDetails;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import com.microsoft.azure.storage.blob.CopyStatus;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.getName;
+
+public class AzureArchiveManager implements SegmentArchiveManager {
+
+ private static final Logger log = LoggerFactory.getLogger(AzureSegmentArchiveReader.class);
+
+ private final CloudBlobDirectory cloudBlobDirectory;
+
+ private final IOMonitor ioMonitor;
+
+ private final FileStoreMonitor monitor;
+
+ public AzureArchiveManager(CloudBlobDirectory cloudBlobDirectory, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) {
+ this.cloudBlobDirectory = cloudBlobDirectory;
+ this.ioMonitor = ioMonitor;
+ this.monitor = fileStoreMonitor;
+ }
+
+ @Override
+ public List<String> listArchives() throws IOException {
+ try {
+ return StreamSupport.stream(cloudBlobDirectory
+ .listBlobs(null, false, EnumSet.noneOf(BlobListingDetails.class), null, null)
+ .spliterator(), false)
+ .filter(i -> i instanceof CloudBlobDirectory)
+ .map(i -> (CloudBlobDirectory) i)
+ .map(CloudBlobDirectory::getPrefix)
+ .map(Paths::get)
+ .map(Path::getFileName)
+ .map(Path::toString)
+ .collect(Collectors.toList());
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public SegmentArchiveReader open(String archiveName) throws IOException {
+ try {
+ CloudBlobDirectory archiveDirectory = getDirectory(archiveName);
+ if (!archiveDirectory.getBlockBlobReference("closed").exists()) {
+ throw new IOException("The archive " + archiveName + " hasn't been closed correctly.");
+ }
+ return new AzureSegmentArchiveReader(archiveDirectory, ioMonitor, monitor);
+ } catch (StorageException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public SegmentArchiveWriter create(String archiveName) throws IOException {
+ return new AzureSegmentArchiveWriter(getDirectory(archiveName), ioMonitor, monitor);
+ }
+
+ @Override
+ public boolean delete(String archiveName) {
+ try {
+ getBlobs(archiveName)
+ .forEach(cloudBlob -> {
+ try {
+ cloudBlob.delete();
+ } catch (StorageException e) {
+ log.error("Can't delete segment {}", cloudBlob.getUri().getPath(), e);
+ }
+ });
+ return true;
+ } catch (IOException e) {
+ log.error("Can't delete archive {}", archiveName, e);
+ return false;
+ }
+ }
+
+ @Override
+ public boolean renameTo(String from, String to) {
+ try {
+ CloudBlobDirectory targetDirectory = getDirectory(to);
+ getBlobs(from)
+ .forEach(cloudBlob -> {
+ try {
+ renameBlob(cloudBlob, targetDirectory);
+ } catch (IOException e) {
+ log.error("Can't rename segment {}", cloudBlob.getUri().getPath(), e);
+ }
+ });
+ return true;
+ } catch (IOException e) {
+ log.error("Can't rename archive {} to {}", from, to, e);
+ return false;
+ }
+ }
+
+ @Override
+ public void copyFile(String from, String to) throws IOException {
+ CloudBlobDirectory targetDirectory = getDirectory(to);
+ getBlobs(from)
+ .forEach(cloudBlob -> {
+ try {
+ copyBlob(cloudBlob, targetDirectory);
+ } catch (IOException e) {
+ log.error("Can't copy segment {}", cloudBlob.getUri().getPath(), e);
+ }
+ });
+ }
+
+ @Override
+ public boolean exists(String archiveName) {
+ try {
+ return listArchives().contains(archiveName);
+ } catch (IOException e) {
+ log.error("Can't check the existence of {}", archiveName, e);
+ return false;
+ }
+ }
+
+ @Override
+ public void recoverEntries(String archiveName, LinkedHashMap<UUID, byte[]> entries) throws IOException {
+ Pattern pattern = Pattern.compile(AzureUtilities.SEGMENT_FILE_NAME_PATTERN);
+ List<RecoveredEntry> entryList = new ArrayList<>();
+
+ for (CloudBlob b : getBlobList(archiveName)) {
+ String name = getName(b);
+ Matcher m = pattern.matcher(name);
+ if (!m.matches()) {
+ continue;
+ }
+ int position = Integer.parseInt(m.group(1), 16);
+ UUID uuid = UUID.fromString(m.group(2));
+ long length = b.getProperties().getLength();
+ if (length > 0) {
+ byte[] data = new byte[(int) length];
+ try {
+ b.downloadToByteArray(data, 0);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ entryList.add(new RecoveredEntry(position, uuid, data, name));
+ }
+ }
+ Collections.sort(entryList);
+
+ int i = 0;
+ for (RecoveredEntry e : entryList) {
+ if (e.position != i) {
+ log.warn("Missing entry {}.??? when recovering {}. No more segments will be read.", String.format("%04X", i), archiveName);
+ break;
+ }
+ log.info("Recovering segment {}/{}", archiveName, e.fileName);
+ entries.put(e.uuid, e.data);
+ i++;
+ }
+ }
+
+
+ private CloudBlobDirectory getDirectory(String archiveName) throws IOException {
+ try {
+ return cloudBlobDirectory.getDirectoryReference(archiveName);
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private Stream<CloudBlob> getBlobs(String archiveName) throws IOException {
+ return AzureUtilities.getBlobs(getDirectory(archiveName));
+ }
+
+ private List<CloudBlob> getBlobList(String archiveName) throws IOException {
+ return getBlobs(archiveName).collect(Collectors.toList());
+ }
+
+ private void renameBlob(CloudBlob blob, CloudBlobDirectory newParent) throws IOException {
+ copyBlob(blob, newParent);
+ try {
+ blob.delete();
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void copyBlob(CloudBlob blob, CloudBlobDirectory newParent) throws IOException {
+ checkArgument(blob instanceof CloudBlockBlob, "Only page blobs are supported for the rename");
+ try {
+ String blobName = getName(blob);
+ CloudBlockBlob newBlob = newParent.getBlockBlobReference(blobName);
+ newBlob.startCopy(blob.getUri());
+ while (newBlob.getCopyState().getStatus() == CopyStatus.PENDING) {
+ Thread.sleep(100);
+ }
+
+ CopyStatus finalStatus = newBlob.getCopyState().getStatus();
+ if (newBlob.getCopyState().getStatus() != CopyStatus.SUCCESS) {
+ throw new IOException("Invalid copy status for " + blob.getUri().getPath() + ": " + finalStatus);
+ }
+ } catch (StorageException | InterruptedException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private static class RecoveredEntry implements Comparable<RecoveredEntry> {
+
+ private final byte[] data;
+
+ private final UUID uuid;
+
+ private final int position;
+
+ private final String fileName;
+
+ public RecoveredEntry(int position, UUID uuid, byte[] data, String fileName) {
+ this.data = data;
+ this.uuid = uuid;
+ this.position = position;
+ this.fileName = fileName;
+ }
+
+ @Override
+ public int compareTo(RecoveredEntry o) {
+ return Integer.compare(this.position, o.position);
+ }
+ }
+
+}
Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java?rev=1827292&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java (added)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java Tue Mar 20 10:54:09 2018
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public final class AzureBlobMetadata {
+
+ private static final String METADATA_TYPE = "type";
+
+ private static final String METADATA_SEGMENT_UUID = "segment-uuid";
+
+ private static final String METADATA_SEGMENT_POSITION = "segment-position";
+
+ private static final String METADATA_SEGMENT_GENERATION = "segment-generation";
+
+ private static final String METADATA_SEGMENT_FULL_GENERATION = "segment-fullGeneration";
+
+ private static final String METADATA_SEGMENT_COMPACTED = "segment-compacted";
+
+ private static final String TYPE_SEGMENT = "segment";
+
+ public static HashMap<String, String> toSegmentMetadata(AzureSegmentArchiveEntry indexEntry) {
+ HashMap<String, String> map = new HashMap<>();
+ map.put(METADATA_TYPE, TYPE_SEGMENT);
+ map.put(METADATA_SEGMENT_UUID, new UUID(indexEntry.getMsb(), indexEntry.getLsb()).toString());
+ map.put(METADATA_SEGMENT_POSITION, String.valueOf(indexEntry.getPosition()));
+ map.put(METADATA_SEGMENT_GENERATION, String.valueOf(indexEntry.getGeneration()));
+ map.put(METADATA_SEGMENT_FULL_GENERATION, String.valueOf(indexEntry.getFullGeneration()));
+ map.put(METADATA_SEGMENT_COMPACTED, String.valueOf(indexEntry.isCompacted()));
+ return map;
+ }
+
+ public static AzureSegmentArchiveEntry toIndexEntry(Map<String, String> metadata, int length) {
+ UUID uuid = UUID.fromString(metadata.get(METADATA_SEGMENT_UUID));
+ long msb = uuid.getMostSignificantBits();
+ long lsb = uuid.getLeastSignificantBits();
+ int position = Integer.parseInt(metadata.get(METADATA_SEGMENT_POSITION));
+ int generation = Integer.parseInt(metadata.get(METADATA_SEGMENT_GENERATION));
+ int fullGeneration = Integer.parseInt(metadata.get(METADATA_SEGMENT_FULL_GENERATION));
+ boolean compacted = Boolean.parseBoolean(metadata.get(METADATA_SEGMENT_COMPACTED));
+ return new AzureSegmentArchiveEntry(msb, lsb, position, length, generation, fullGeneration, compacted);
+ }
+
+ public static boolean isSegment(Map<String, String> metadata) {
+ return metadata != null && TYPE_SEGMENT.equals(metadata.get(METADATA_TYPE));
+ }
+
+}
Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalFile.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalFile.java?rev=1827292&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalFile.java (added)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalFile.java Tue Mar 20 10:54:09 2018
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.google.common.base.Charsets;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudAppendBlob;
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.oak.segment.spi.persistence.GCJournalFile;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Collections;
+import java.util.List;
+
+public class AzureGCJournalFile implements GCJournalFile {
+
+ private final CloudAppendBlob gcJournal;
+
+ public AzureGCJournalFile(CloudAppendBlob gcJournal) {
+ this.gcJournal = gcJournal;
+ }
+
+ @Override
+ public void writeLine(String line) throws IOException {
+ try {
+ if (!gcJournal.exists()) {
+ gcJournal.createOrReplace();
+ }
+ gcJournal.appendText(line + "\n", Charsets.UTF_8.name(), null, null, null);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public List<String> readLines() throws IOException {
+ try {
+ if (!gcJournal.exists()) {
+ return Collections.emptyList();
+ }
+ byte[] data = new byte[(int) gcJournal.getProperties().getLength()];
+ gcJournal.downloadToByteArray(data, 0);
+ return IOUtils.readLines(new ByteArrayInputStream(data), Charset.defaultCharset());
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+}
Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java?rev=1827292&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java (added)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java Tue Mar 20 10:54:09 2018
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudAppendBlob;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import com.microsoft.azure.storage.blob.ListBlobItem;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class AzureJournalFile implements JournalFile {
+
+ private static final Logger log = LoggerFactory.getLogger(AzureJournalFile.class);
+
+ private static final int JOURNAL_LINE_LIMIT = Integer.getInteger("org.apache.jackrabbit.oak.segment.azure.journal.lines", 40_000);
+
+ private final CloudBlobDirectory directory;
+
+ private final String journalNamePrefix;
+
+ private final int lineLimit;
+
+ AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix, int lineLimit) {
+ this.directory = directory;
+ this.journalNamePrefix = journalNamePrefix;
+ this.lineLimit = lineLimit;
+ }
+
+ public AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix) {
+ this(directory, journalNamePrefix, JOURNAL_LINE_LIMIT);
+ }
+
+ @Override
+ public JournalFileReader openJournalReader() throws IOException {
+ return new CombinedReader(getJournalBlobs());
+ }
+
+ @Override
+ public JournalFileWriter openJournalWriter() throws IOException {
+ return new AzureJournalWriter();
+ }
+
+ @Override
+ public String getName() {
+ return journalNamePrefix;
+ }
+
+ @Override
+ public boolean exists() {
+ try {
+ return !getJournalBlobs().isEmpty();
+ } catch (IOException e) {
+ log.error("Can't check if the file exists", e);
+ return false;
+ }
+ }
+
+ private String getJournalFileName(int index) {
+ return String.format("%s.%03d", journalNamePrefix, index);
+ }
+
+ private List<CloudAppendBlob> getJournalBlobs() throws IOException {
+ try {
+ List<CloudAppendBlob> result = new ArrayList<>();
+ for (ListBlobItem b : directory.listBlobs(journalNamePrefix)) {
+ if (b instanceof CloudAppendBlob) {
+ result.add((CloudAppendBlob) b);
+ } else {
+ log.warn("Invalid blob type: {} {}", b.getUri(), b.getClass());
+ }
+ }
+ result.sort(Comparator.<CloudAppendBlob, String>comparing(AzureUtilities::getName).reversed());
+ return result;
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private static class AzureJournalReader implements JournalFileReader {
+
+ private final CloudBlob blob;
+
+ private ReverseFileReader reader;
+
+ private AzureJournalReader(CloudBlob blob) {
+ this.blob = blob;
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ if (reader == null) {
+ try {
+ reader = new ReverseFileReader(blob);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+ return reader.readLine();
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+ }
+
+ private class AzureJournalWriter implements JournalFileWriter {
+
+ private CloudAppendBlob currentBlob;
+
+ private int blockCount;
+
+ public AzureJournalWriter() throws IOException {
+ List<CloudAppendBlob> blobs = getJournalBlobs();
+ if (blobs.isEmpty()) {
+ try {
+ currentBlob = directory.getAppendBlobReference(getJournalFileName(1));
+ currentBlob.createOrReplace();
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ } else {
+ currentBlob = blobs.get(0);
+ }
+ Integer bc = currentBlob.getProperties().getAppendBlobCommittedBlockCount();
+ blockCount = bc == null ? 0 : bc;
+ }
+
+ @Override
+ public void truncate() throws IOException {
+ try {
+ for (CloudAppendBlob cloudAppendBlob : getJournalBlobs()) {
+ cloudAppendBlob.delete();
+ }
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void writeLine(String line) throws IOException {
+ if (blockCount >= lineLimit) {
+ createNewFile();
+ }
+ try {
+ currentBlob.appendText(line + "\n");
+ blockCount++;
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void createNewFile() throws IOException {
+ String name = AzureUtilities.getName(currentBlob);
+ Pattern pattern = Pattern.compile(Pattern.quote(journalNamePrefix) + "\\.(\\d+)" );
+ Matcher matcher = pattern.matcher(name);
+ int parsedSuffix;
+ if (matcher.find()) {
+ String suffix = matcher.group(1);
+ try {
+ parsedSuffix = Integer.parseInt(suffix);
+ } catch (NumberFormatException e) {
+ log.warn("Can't parse suffix for journal file {}", name);
+ parsedSuffix = 0;
+ }
+ } else {
+ log.warn("Can't parse journal file name {}", name);
+ parsedSuffix = 0;
+ }
+ try {
+ currentBlob = directory.getAppendBlobReference(getJournalFileName(parsedSuffix + 1));
+ currentBlob.createOrReplace();
+ blockCount = 0;
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
+ }
+
+ private static class CombinedReader implements JournalFileReader {
+
+ private final Iterator<AzureJournalReader> readers;
+
+ private JournalFileReader currentReader;
+
+ private CombinedReader(List<CloudAppendBlob> blobs) {
+ readers = blobs.stream().map(AzureJournalReader::new).iterator();
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ String line;
+ do {
+ if (currentReader == null) {
+ if (!readers.hasNext()) {
+ return null;
+ }
+ currentReader = readers.next();
+ }
+ do {
+ line = currentReader.readLine();
+ } while ("".equals(line));
+ if (line == null) {
+ currentReader.close();
+ currentReader = null;
+ }
+ } while (line == null);
+ return line;
+ }
+
+ @Override
+ public void close() throws IOException {
+ while (readers.hasNext()) {
+ readers.next().close();
+ }
+ if (currentReader != null) {
+ currentReader.close();
+ currentReader = null;
+ }
+ }
+ }
+}
\ No newline at end of file
Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFile.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFile.java?rev=1827292&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFile.java (added)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFile.java Tue Mar 20 10:54:09 2018
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import org.apache.jackrabbit.oak.segment.spi.persistence.ManifestFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Properties;
+
+public class AzureManifestFile implements ManifestFile {
+
+ private static final Logger log = LoggerFactory.getLogger(AzureManifestFile.class);
+
+ private final CloudBlockBlob manifestBlob;
+
+ public AzureManifestFile(CloudBlockBlob manifestBlob) {
+ this.manifestBlob = manifestBlob;
+ }
+
+ @Override
+ public boolean exists() {
+ try {
+ return manifestBlob.exists();
+ } catch (StorageException e) {
+ log.error("Can't check if the manifest exists", e);
+ return false;
+ }
+ }
+
+ @Override
+ public Properties load() throws IOException {
+ Properties properties = new Properties();
+ if (exists()) {
+ long length = manifestBlob.getProperties().getLength();
+ byte[] data = new byte[(int) length];
+ try {
+ manifestBlob.downloadToByteArray(data, 0);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ properties.load(new ByteArrayInputStream(data));
+ }
+ return properties;
+ }
+
+ @Override
+ public void save(Properties properties) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ properties.store(bos, null);
+
+ byte[] data = bos.toByteArray();
+ try {
+ manifestBlob.uploadFromByteArray(data, 0, data.length);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+}
Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java?rev=1827292&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java (added)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java Tue Mar 20 10:54:09 2018
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.BlobListingDetails;
+import com.microsoft.azure.storage.blob.CloudAppendBlob;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import com.microsoft.azure.storage.blob.ListBlobItem;
+import org.apache.jackrabbit.oak.segment.spi.persistence.GCJournalFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.ManifestFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.EnumSet;
+
+public class AzurePersistence implements SegmentNodeStorePersistence {
+
+ private static final Logger log = LoggerFactory.getLogger(AzurePersistence.class);
+
+ private final CloudBlobDirectory segmentstoreDirectory;
+
+ public AzurePersistence(CloudBlobDirectory segmentstoreDirectory) {
+ this.segmentstoreDirectory = segmentstoreDirectory;
+ }
+
+ @Override
+ public SegmentArchiveManager createArchiveManager(boolean mmap, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) {
+ return new AzureArchiveManager(segmentstoreDirectory, ioMonitor, fileStoreMonitor);
+ }
+
+ @Override
+ public boolean segmentFilesExist() {
+ try {
+ for (ListBlobItem i : segmentstoreDirectory.listBlobs(null, false, EnumSet.noneOf(BlobListingDetails.class), null, null)) {
+ if (i instanceof CloudBlobDirectory) {
+ CloudBlobDirectory dir = (CloudBlobDirectory) i;
+ String name = Paths.get(dir.getPrefix()).getFileName().toString();
+ if (name.endsWith(".tar")) {
+ return true;
+ }
+ }
+ }
+ return false;
+ } catch (StorageException | URISyntaxException e) {
+ log.error("Can't check if the segment archives exists", e);
+ return false;
+ }
+ }
+
+ @Override
+ public JournalFile getJournalFile() {
+ return new AzureJournalFile(segmentstoreDirectory, "journal.log");
+ }
+
+ @Override
+ public GCJournalFile getGCJournalFile() throws IOException {
+ return new AzureGCJournalFile(getAppendBlob("gc.log"));
+ }
+
+ @Override
+ public ManifestFile getManifestFile() throws IOException {
+ return new AzureManifestFile(getBlockBlob("manifest"));
+ }
+
+ @Override
+ public RepositoryLock lockRepository() throws IOException {
+ return new AzureRepositoryLock(getBlockBlob("repo.lock"), new Runnable() {
+ @Override
+ public void run() {
+ log.warn("Lost connection to the Azure. The client will be closed.");
+ // TODO close the connection
+ }
+ }).lock();
+ }
+
+ private CloudBlockBlob getBlockBlob(String path) throws IOException {
+ try {
+ return segmentstoreDirectory.getBlockBlobReference(path);
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private CloudAppendBlob getAppendBlob(String path) throws IOException {
+ try {
+ return segmentstoreDirectory.getAppendBlobReference(path);
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+}
Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java?rev=1827292&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java (added)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java Tue Mar 20 10:54:09 2018
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.AccessCondition;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class AzureRepositoryLock implements RepositoryLock {
+
+ private static final Logger log = LoggerFactory.getLogger(AzureRepositoryLock.class);
+
+ private static int INTERVAL = 60;
+
+ private final Runnable shutdownHook;
+
+ private final CloudBlockBlob blob;
+
+ private final ExecutorService executor;
+
+ private String leaseId;
+
+ private volatile boolean doUpdate;
+
+ public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook) {
+ this.shutdownHook = shutdownHook;
+ this.blob = blob;
+ this.executor = Executors.newSingleThreadExecutor();
+ }
+
+ public AzureRepositoryLock lock() throws IOException {
+ try {
+ blob.openOutputStream().close();
+ leaseId = blob.acquireLease(INTERVAL, null);
+ log.info("Acquired lease {}", leaseId);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ executor.submit(this::refreshLease);
+ return this;
+ }
+
+ private void refreshLease() {
+ doUpdate = true;
+ long lastUpdate = 0;
+ while (doUpdate) {
+ try {
+ long timeSinceLastUpdate = (System.currentTimeMillis() - lastUpdate) / 1000;
+ if (timeSinceLastUpdate > INTERVAL / 2) {
+ blob.renewLease(AccessCondition.generateLeaseCondition(leaseId));
+ lastUpdate = System.currentTimeMillis();
+ }
+ } catch (StorageException e) {
+ log.error("Can't renew the lease", e);
+ shutdownHook.run();
+ doUpdate = false;
+ return;
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ log.error("Interrupted the lease renewal loop", e);
+ }
+ }
+ }
+
+ @Override
+ public void unlock() throws IOException {
+ doUpdate = false;
+ executor.shutdown();
+ try {
+ executor.awaitTermination(1, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } finally {
+ releaseLease();
+ }
+ }
+
+ private void releaseLease() throws IOException {
+ try {
+ blob.releaseLease(AccessCondition.generateLeaseCondition(leaseId));
+ blob.delete();
+ log.info("Released lease {}", leaseId);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+}
Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java?rev=1827292&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java (added)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java Tue Mar 20 10:54:09 2018
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import org.apache.jackrabbit.oak.segment.file.tar.index.IndexEntry;
+
+public class AzureSegmentArchiveEntry implements IndexEntry {
+
+ private final long msb;
+
+ private final long lsb;
+
+ private final int position;
+
+ private final int length;
+
+ private final int generation;
+
+ private final int fullGeneration;
+
+ private final boolean compacted;
+
+ public AzureSegmentArchiveEntry(long msb, long lsb, int position, int length, int generation, int fullGeneration, boolean compacted) {
+ this.msb = msb;
+ this.lsb = lsb;
+ this.position = position;
+ this.length = length;
+ this.generation = generation;
+ this.fullGeneration = fullGeneration;
+ this.compacted = compacted;
+ }
+
+ @Override
+ public long getMsb() {
+ return msb;
+ }
+
+ @Override
+ public long getLsb() {
+ return lsb;
+ }
+
+ @Override
+ public int getPosition() {
+ return position;
+ }
+
+ @Override
+ public int getLength() {
+ return length;
+ }
+
+ @Override
+ public int getGeneration() {
+ return generation;
+ }
+
+ @Override
+ public int getFullGeneration() {
+ return fullGeneration;
+ }
+
+ @Override
+ public boolean isCompacted() {
+ return compacted;
+ }
+}
Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java?rev=1827292&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java Tue Mar 20 10:54:09 2018
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.google.common.base.Stopwatch;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.getSegmentFileName;
+import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.readBufferFully;
+
+public class AzureSegmentArchiveReader implements SegmentArchiveReader {
+
+ private final CloudBlobDirectory archiveDirectory;
+
+ private final IOMonitor ioMonitor;
+
+ private final FileStoreMonitor monitor;
+
+ private final long length;
+
+ private final Map<UUID, AzureSegmentArchiveEntry> index = new LinkedHashMap<>();
+
+ private Boolean hasGraph;
+
+ AzureSegmentArchiveReader(CloudBlobDirectory archiveDirectory, IOMonitor ioMonitor, FileStoreMonitor monitor) throws IOException {
+ this.archiveDirectory = archiveDirectory;
+ this.ioMonitor = ioMonitor;
+ this.monitor = monitor;
+ long length = 0;
+ try {
+ for (CloudBlob blob : AzureUtilities.getBlobs(archiveDirectory).collect(Collectors.toList())) {
+ blob.downloadAttributes();
+ Map<String, String> metadata = blob.getMetadata();
+ if (AzureBlobMetadata.isSegment(metadata)) {
+ AzureSegmentArchiveEntry indexEntry = AzureBlobMetadata.toIndexEntry(metadata, (int) blob.getProperties().getLength());
+ index.put(new UUID(indexEntry.getMsb(), indexEntry.getLsb()), indexEntry);
+ }
+ length += blob.getProperties().getLength();
+ }
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ this.length = length;
+ }
+
+ @Override
+ public ByteBuffer readSegment(long msb, long lsb) throws IOException {
+ AzureSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb));
+ ByteBuffer buffer = ByteBuffer.allocate(indexEntry.getLength());
+ ioMonitor.beforeSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength());
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ readBufferFully(getBlob(getSegmentFileName(indexEntry)), buffer);
+ long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
+ ioMonitor.afterSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength(), elapsed);
+ return buffer;
+ }
+
+ @Override
+ public boolean containsSegment(long msb, long lsb) {
+ return index.containsKey(new UUID(msb, lsb));
+ }
+
+ @Override
+ public List<SegmentArchiveEntry> listSegments() {
+ return new ArrayList<>(index.values());
+ }
+
+ @Override
+ public ByteBuffer getGraph() throws IOException {
+ ByteBuffer graph = readBlob(getName() + ".gph");
+ hasGraph = graph != null;
+ return graph;
+ }
+
+ @Override
+ public boolean hasGraph() {
+ if (hasGraph == null) {
+ try {
+ getGraph();
+ } catch (IOException ignore) { }
+ }
+ return hasGraph;
+ }
+
+ @Override
+ public ByteBuffer getBinaryReferences() throws IOException {
+ return readBlob(getName() + ".brf");
+ }
+
+ @Override
+ public long length() {
+ return length;
+ }
+
+ @Override
+ public String getName() {
+ return AzureUtilities.getName(archiveDirectory);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
+
+ @Override
+ public int getEntrySize(int size) {
+ return size;
+ }
+
+ private File pathAsFile() {
+ return new File(archiveDirectory.getUri().getPath());
+ }
+
+ private CloudBlockBlob getBlob(String name) throws IOException {
+ try {
+ return archiveDirectory.getBlockBlobReference(name);
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private ByteBuffer readBlob(String name) throws IOException {
+ try {
+ CloudBlockBlob blob = getBlob(name);
+ if (!blob.exists()) {
+ return null;
+ }
+ long length = blob.getProperties().getLength();
+ ByteBuffer buffer = ByteBuffer.allocate((int) length);
+ AzureUtilities.readBufferFully(blob, buffer);
+ return buffer;
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+}
Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java?rev=1827292&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java (added)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java Tue Mar 20 10:54:09 2018
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.google.common.base.Stopwatch;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
+import org.apache.jackrabbit.oak.segment.azure.queue.SegmentWriteAction;
+import org.apache.jackrabbit.oak.segment.azure.queue.SegmentWriteQueue;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.getSegmentFileName;
+import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.readBufferFully;
+
+public class AzureSegmentArchiveWriter implements SegmentArchiveWriter {
+
+ private final CloudBlobDirectory archiveDirectory;
+
+ private final IOMonitor ioMonitor;
+
+ private final FileStoreMonitor monitor;
+
+ private final Optional<SegmentWriteQueue> queue;
+
+ private Map<UUID, AzureSegmentArchiveEntry> index = Collections.synchronizedMap(new LinkedHashMap<>());
+
+ private int entries;
+
+ private long totalLength;
+
+ private volatile boolean created = false;
+
+ public AzureSegmentArchiveWriter(CloudBlobDirectory archiveDirectory, IOMonitor ioMonitor, FileStoreMonitor monitor) {
+ this.archiveDirectory = archiveDirectory;
+ this.ioMonitor = ioMonitor;
+ this.monitor = monitor;
+ this.queue = SegmentWriteQueue.THREADS > 0 ? Optional.of(new SegmentWriteQueue(this::doWriteEntry)) : Optional.empty();
+ }
+
+ @Override
+ public void writeSegment(long msb, long lsb, byte[] data, int offset, int size, int generation, int fullGeneration, boolean compacted) throws IOException {
+ created = true;
+
+ AzureSegmentArchiveEntry entry = new AzureSegmentArchiveEntry(msb, lsb, entries++, size, generation, fullGeneration, compacted);
+ if (queue.isPresent()) {
+ queue.get().addToQueue(entry, data, offset, size);
+ } else {
+ doWriteEntry(entry, data, offset, size);
+ }
+ index.put(new UUID(msb, lsb), entry);
+
+ totalLength += size;
+ monitor.written(size);
+ }
+
+ private void doWriteEntry(AzureSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException {
+ long msb = indexEntry.getMsb();
+ long lsb = indexEntry.getLsb();
+ ioMonitor.beforeSegmentWrite(pathAsFile(), msb, lsb, size);
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ try {
+ CloudBlockBlob blob = getBlob(getSegmentFileName(indexEntry));
+ blob.setMetadata(AzureBlobMetadata.toSegmentMetadata(indexEntry));
+ blob.uploadFromByteArray(data, offset, size);
+ blob.uploadMetadata();
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ ioMonitor.afterSegmentWrite(pathAsFile(), msb, lsb, size, stopwatch.elapsed(TimeUnit.NANOSECONDS));
+ }
+
+ @Override
+ public ByteBuffer readSegment(long msb, long lsb) throws IOException {
+ UUID uuid = new UUID(msb, lsb);
+ Optional<SegmentWriteAction> segment = queue.map(q -> q.read(uuid));
+ if (segment.isPresent()) {
+ return segment.get().toByteBuffer();
+ }
+ AzureSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb));
+ if (indexEntry == null) {
+ return null;
+ }
+ ByteBuffer buffer = ByteBuffer.allocate(indexEntry.getLength());
+ readBufferFully(getBlob(getSegmentFileName(indexEntry)), buffer);
+ return buffer;
+ }
+
+ @Override
+ public boolean containsSegment(long msb, long lsb) {
+ UUID uuid = new UUID(msb, lsb);
+ Optional<SegmentWriteAction> segment = queue.map(q -> q.read(uuid));
+ if (segment.isPresent()) {
+ return true;
+ }
+ return index.containsKey(new UUID(msb, lsb));
+ }
+
+ @Override
+ public void writeGraph(byte[] data) throws IOException {
+ writeDataFile(data, ".gph");
+ }
+
+ @Override
+ public void writeBinaryReferences(byte[] data) throws IOException {
+ writeDataFile(data, ".brf");
+ }
+
+ private void writeDataFile(byte[] data, String extension) throws IOException {
+ try {
+ getBlob(getName() + extension).uploadFromByteArray(data, 0, data.length);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ totalLength += data.length;
+ monitor.written(data.length);
+ }
+
+ @Override
+ public long getLength() {
+ return totalLength;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (queue.isPresent()) { // required to handle IOException
+ SegmentWriteQueue q = queue.get();
+ q.flush();
+ q.close();
+ }
+ try {
+ getBlob("closed").uploadFromByteArray(new byte[0], 0, 0);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public boolean isCreated() {
+ return created || !queueIsEmpty();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (queue.isPresent()) { // required to handle IOException
+ queue.get().flush();
+ }
+ }
+
+ private boolean queueIsEmpty() {
+ return queue.map(SegmentWriteQueue::isEmpty).orElse(true);
+ }
+
+ @Override
+ public String getName() {
+ return AzureUtilities.getName(archiveDirectory);
+ }
+
+ private File pathAsFile() {
+ return new File(archiveDirectory.getUri().getPath());
+ }
+
+ private CloudBlockBlob getBlob(String name) throws IOException {
+ try {
+ return archiveDirectory.getBlockBlobReference(name);
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+}
Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentStoreService.java?rev=1827292&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentStoreService.java (added)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentStoreService.java Tue Mar 20 10:54:09 2018
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.ConfigurationPolicy;
+import org.osgi.service.component.annotations.Deactivate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.Properties;
+
+
+@Component(
+ configurationPolicy = ConfigurationPolicy.REQUIRE,
+ configurationPid = {Configuration.PID})
+public class AzureSegmentStoreService {
+
+ private static final Logger log = LoggerFactory.getLogger(AzureSegmentStoreService.class);
+
+ public static final String DEFAULT_CONTAINER_NAME = "oak";
+
+ public static final String DEFAULT_ROOT_PATH = "/oak";
+
+ private ServiceRegistration registration;
+
+ private SegmentNodeStorePersistence persistence;
+
+ @Activate
+ public void activate(ComponentContext context, Configuration config) throws IOException {
+ persistence = createAzurePersistence(config);
+ registration = context.getBundleContext().registerService(SegmentNodeStorePersistence.class.getName(), persistence, new Properties());
+ }
+
+ @Deactivate
+ public void deactivate() throws IOException {
+ if (registration != null) {
+ registration.unregister();
+ registration = null;
+ }
+ persistence = null;
+ }
+
+ private static SegmentNodeStorePersistence createAzurePersistence(Configuration configuration) throws IOException {
+ try {
+ StringBuilder connectionString = new StringBuilder();
+ if (configuration.connectionURL() != null) {
+ connectionString.append(configuration.connectionURL());
+ } else {
+ connectionString.append("DefaultEndpointsProtocol=https;");
+ connectionString.append("AccountName=").append(configuration.accountName()).append(';');
+ connectionString.append("AccountKey=").append(configuration.accessKey()).append(';');
+ }
+ log.info("Connection string: {}", connectionString.toString());
+ CloudStorageAccount cloud = CloudStorageAccount.parse(connectionString.toString());
+ CloudBlobContainer container = cloud.createCloudBlobClient().getContainerReference(configuration.containerName());
+ container.createIfNotExists();
+
+ String path = configuration.rootPath();
+ if (path != null && path.length() > 0 && path.charAt(0) == '/') {
+ path = path.substring(1);
+ }
+
+ AzurePersistence persistence = new AzurePersistence(container.getDirectoryReference(path));
+ return persistence;
+ } catch (StorageException | URISyntaxException | InvalidKeyException e) {
+ throw new IOException(e);
+ }
+ }
+
+}
+
Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java?rev=1827292&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java (added)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java Tue Mar 20 10:54:09 2018
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.file.Paths;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+public final class AzureUtilities {
+
+ public static String SEGMENT_FILE_NAME_PATTERN = "^([0-9a-f]{4})\\.([0-9a-f-]+)$";
+
+ private AzureUtilities() {
+ }
+
+ public static String getSegmentFileName(AzureSegmentArchiveEntry indexEntry) {
+ return getSegmentFileName(indexEntry.getPosition(), indexEntry.getMsb(), indexEntry.getLsb());
+ }
+
+ public static String getSegmentFileName(long offset, long msb, long lsb) {
+ return String.format("%04x.%s", offset, new UUID(msb, lsb).toString());
+ }
+
+ public static String getName(CloudBlob blob) {
+ return Paths.get(blob.getName()).getFileName().toString();
+ }
+
+ public static String getName(CloudBlobDirectory directory) {
+ return Paths.get(directory.getUri().getPath()).getFileName().toString();
+ }
+
+ public static Stream<CloudBlob> getBlobs(CloudBlobDirectory directory) throws IOException {
+ try {
+ return StreamSupport.stream(directory.listBlobs().spliterator(), false)
+ .filter(i -> i instanceof CloudBlob)
+ .map(i -> (CloudBlob) i);
+ } catch (StorageException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public static long getDirectorySize(CloudBlobDirectory directory) throws IOException {
+ long size = 0;
+ for (CloudBlob b : getBlobs(directory).collect(Collectors.toList())) {
+ try {
+ b.downloadAttributes();
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ size += b.getProperties().getLength();
+ }
+ return size;
+ }
+
+ public static void readBufferFully(CloudBlob blob, ByteBuffer buffer) throws IOException {
+ try {
+ buffer.rewind();
+ long readBytes = blob.downloadToByteArray(buffer.array(), 0);
+ if (buffer.limit() != readBytes) {
+ throw new IOException("Buffer size: " + buffer.limit() + ", read bytes: " + readBytes);
+ }
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+}
Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/Configuration.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/Configuration.java?rev=1827292&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/Configuration.java (added)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/Configuration.java Tue Mar 20 10:54:09 2018
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+
+import static org.apache.jackrabbit.oak.segment.azure.Configuration.PID;
+
+@ObjectClassDefinition(
+ pid = {PID},
+ name = "Apache Jackrabbit Oak Azure Segment Store Service",
+ description = "Azure backend for the Oak Segment Node Store")
+@interface Configuration {
+
+ String PID = "org.apache.jackrabbit.oak.segment.azure.AzureSegmentStoreService";
+
+ @AttributeDefinition(
+ name = "Azure account name",
+ description = "Name of the Azure Storage account to use.")
+ String accountName();
+
+ @AttributeDefinition(
+ name = "Azure container name",
+ description = "Name of the container to use. If it doesn't exists, it'll be created.")
+ String containerName() default AzureSegmentStoreService.DEFAULT_CONTAINER_NAME;
+
+ @AttributeDefinition(
+ name = "Azure account access key",
+ description = "Access key which should be used to authenticate on the account")
+ String accessKey();
+
+ @AttributeDefinition(
+ name = "Root path",
+ description = "Names of all the created blobs will be prefixed with this path")
+ String rootPath() default AzureSegmentStoreService.DEFAULT_ROOT_PATH;
+
+ @AttributeDefinition(
+ name = "Azure connection URL (optional)",
+ description = "Connection URL to be used to connect to the Azure Storage. " +
+ "Setting it will override the accountName, containerName and accessKey properties.")
+ String connectionURL();
+}
\ No newline at end of file
Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ReverseFileReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ReverseFileReader.java?rev=1827292&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ReverseFileReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ReverseFileReader.java Tue Mar 20 10:54:09 2018
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlob;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static java.lang.Math.min;
+
+public class ReverseFileReader {
+
+ private static final int BUFFER_SIZE = 16 * 1024;
+
+ private int bufferSize;
+
+ private final CloudBlob blob;
+
+ private byte[] buffer;
+
+ private int bufferOffset;
+
+ private int fileOffset;
+
+ public ReverseFileReader(CloudBlob blob) throws StorageException {
+ this(blob, BUFFER_SIZE);
+ }
+
+ public ReverseFileReader(CloudBlob blob, int bufferSize) throws StorageException {
+ this.blob = blob;
+ if (blob.exists()) {
+ this.fileOffset = (int) blob.getProperties().getLength();
+ } else {
+ this.fileOffset = 0;
+ }
+ this.bufferSize = bufferSize;
+ }
+
+ private void readBlock() throws IOException {
+ if (buffer == null) {
+ buffer = new byte[min(fileOffset, bufferSize)];
+ } else if (fileOffset < buffer.length) {
+ buffer = new byte[fileOffset];
+ }
+
+ if (buffer.length > 0) {
+ fileOffset -= buffer.length;
+ try {
+ blob.downloadRangeToByteArray(fileOffset, Long.valueOf(buffer.length), buffer, 0);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+ bufferOffset = buffer.length;
+ }
+
+ private String readUntilNewLine() {
+ if (bufferOffset == -1) {
+ return "";
+ }
+ int stop = bufferOffset;
+ while (--bufferOffset >= 0) {
+ if (buffer[bufferOffset] == '\n') {
+ break;
+ }
+ }
+ // bufferOffset points either the previous '\n' character or -1
+ return new String(buffer, bufferOffset + 1, stop - bufferOffset - 1, Charset.defaultCharset());
+ }
+
+ public String readLine() throws IOException {
+ if (bufferOffset == -1 && fileOffset == 0) {
+ return null;
+ }
+
+ if (buffer == null) {
+ readBlock();
+ }
+
+ List<String> result = new ArrayList<>(1);
+ while (true) {
+ result.add(readUntilNewLine());
+ if (bufferOffset > -1) { // stopped on the '\n'
+ break;
+ }
+ if (fileOffset == 0) { // reached the beginning of the file
+ break;
+ }
+ readBlock();
+ }
+ Collections.reverse(result);
+ return String.join("", result);
+ }
+}