You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/05/31 17:51:24 UTC

[GitHub] sijie closed pull request #1876: When offloading to S3 add version information

sijie closed pull request #1876: When offloading to S3 add version information
URL: https://github.com/apache/incubator-pulsar/pull/1876
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
index dcfa9e88a1..73bb78a140 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
@@ -33,6 +33,7 @@
 import com.amazonaws.services.s3.model.UploadPartResult;
 import com.google.common.base.Strings;
 import java.io.InputStream;
+import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -45,6 +46,7 @@
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl;
 import org.apache.pulsar.broker.s3offload.impl.S3BackedReadHandleImpl;
+import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,6 +54,20 @@
     private static final Logger log = LoggerFactory.getLogger(S3ManagedLedgerOffloader.class);
 
     public static final String DRIVER_NAME = "S3";
+
+    static final String METADATA_FORMAT_VERSION_KEY = "S3ManagedLedgerOffloaderFormatVersion";
+    static final String METADATA_SOFTWARE_VERSION_KEY = "S3ManagedLedgerOffloaderSoftwareVersion";
+    static final String METADATA_SOFTWARE_GITSHA_KEY = "S3ManagedLedgerOffloaderSoftwareGitSha";
+    static final String CURRENT_VERSION = String.valueOf(1);
+
+    private final VersionCheck VERSION_CHECK = (key, metadata) -> {
+        String version = metadata.getUserMetadata().get(METADATA_FORMAT_VERSION_KEY);
+        if (version == null || !version.equals(CURRENT_VERSION)) {
+            throw new IOException(String.format("Invalid object version %s for %s, expect %s",
+                                                version, key, CURRENT_VERSION));
+        }
+    };
+
     private final OrderedScheduler scheduler;
     private final AmazonS3 s3client;
     private final String bucket;
@@ -124,7 +140,11 @@ static String indexBlockOffloadKey(long ledgerId, UUID uuid) {
                 .withDataBlockHeaderLength(BlockAwareSegmentInputStreamImpl.getHeaderSize());
             String dataBlockKey = dataBlockOffloadKey(readHandle.getId(), uuid);
             String indexBlockKey = indexBlockOffloadKey(readHandle.getId(), uuid);
-            InitiateMultipartUploadRequest dataBlockReq = new InitiateMultipartUploadRequest(bucket, dataBlockKey, new ObjectMetadata());
+
+            ObjectMetadata dataMetadata = new ObjectMetadata();
+            addVersionInfo(dataMetadata);
+
+            InitiateMultipartUploadRequest dataBlockReq = new InitiateMultipartUploadRequest(bucket, dataBlockKey, dataMetadata);
             InitiateMultipartUploadResult dataBlockRes = null;
 
             // init multi part upload for data block.
@@ -195,6 +215,8 @@ static String indexBlockOffloadKey(long ledgerId, UUID uuid) {
                 // write the index block
                 ObjectMetadata metadata = new ObjectMetadata();
                 metadata.setContentLength(indexStream.getStreamSize());
+                addVersionInfo(metadata);
+
                 s3client.putObject(new PutObjectRequest(
                     bucket,
                     indexBlockKey,
@@ -225,6 +247,7 @@ static String indexBlockOffloadKey(long ledgerId, UUID uuid) {
                     promise.complete(S3BackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
                                                                  s3client,
                                                                  bucket, key, indexKey,
+                                                                 VERSION_CHECK,
                                                                  ledgerId, readBufferSize));
                 } catch (Throwable t) {
                     promise.completeExceptionally(t);
@@ -233,6 +256,13 @@ static String indexBlockOffloadKey(long ledgerId, UUID uuid) {
         return promise;
     }
 
+    private static void addVersionInfo(ObjectMetadata metadata) {
+        metadata.getUserMetadata().put(METADATA_FORMAT_VERSION_KEY, CURRENT_VERSION);
+        metadata.getUserMetadata().put(METADATA_SOFTWARE_VERSION_KEY,
+                                       PulsarBrokerVersionStringUtils.getNormalizedVersionString());
+        metadata.getUserMetadata().put(METADATA_SOFTWARE_GITSHA_KEY, PulsarBrokerVersionStringUtils.getGitSha());
+    }
+
     @Override
     public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
@@ -249,6 +279,10 @@ static String indexBlockOffloadKey(long ledgerId, UUID uuid) {
 
         return promise;
     }
+
+    public interface VersionCheck {
+        void check(String key, ObjectMetadata md) throws IOException;
+    }
 }
 
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedInputStreamImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedInputStreamImpl.java
index 0c5e3df497..912a1d514b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedInputStreamImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedInputStreamImpl.java
@@ -30,6 +30,8 @@
 import java.io.IOException;
 
 import org.apache.pulsar.broker.s3offload.S3BackedInputStream;
+import org.apache.pulsar.broker.s3offload.S3ManagedLedgerOffloader.VersionCheck;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +41,7 @@
     private final AmazonS3 s3client;
     private final String bucket;
     private final String key;
+    private final VersionCheck versionCheck;
     private final ByteBuf buffer;
     private final long objectLen;
     private final int bufferSize;
@@ -46,10 +49,12 @@
     private long cursor;
 
     public S3BackedInputStreamImpl(AmazonS3 s3client, String bucket, String key,
+                                   VersionCheck versionCheck,
                                    long objectLen, int bufferSize) {
         this.s3client = s3client;
         this.bucket = bucket;
         this.key = key;
+        this.versionCheck = versionCheck;
         this.buffer = PooledByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize);
         this.objectLen = objectLen;
         this.bufferSize = bufferSize;
@@ -72,6 +77,8 @@ private boolean refillBufferIfNeeded() throws IOException {
                 .withRange(startRange, endRange);
             log.debug("Reading range {}-{} from {}/{}", startRange, endRange, bucket, key);
             try (S3Object obj = s3client.getObject(req)) {
+                versionCheck.check(key, obj.getObjectMetadata());
+
                 Long[] range = obj.getObjectMetadata().getContentRange();
                 long bytesRead = range[1] - range[0] + 1;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java
index 984af597a4..65acbb848b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java
@@ -48,6 +48,7 @@
 import org.apache.pulsar.broker.s3offload.OffloadIndexBlockBuilder;
 import org.apache.pulsar.broker.s3offload.OffloadIndexEntry;
 import org.apache.pulsar.broker.s3offload.S3BackedInputStream;
+import org.apache.pulsar.broker.s3offload.S3ManagedLedgerOffloader.VersionCheck;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -190,14 +191,18 @@ public boolean isClosed() {
 
     public static ReadHandle open(ScheduledExecutorService executor,
                                   AmazonS3 s3client, String bucket, String key, String indexKey,
+                                  VersionCheck versionCheck,
                                   long ledgerId, int readBufferSize)
             throws AmazonClientException, IOException {
         GetObjectRequest req = new GetObjectRequest(bucket, indexKey);
         try (S3Object obj = s3client.getObject(req)) {
+            versionCheck.check(indexKey, obj.getObjectMetadata());
+
             OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create();
             OffloadIndexBlock index = indexBuilder.fromStream(obj.getObjectContent());
 
             S3BackedInputStream inputStream = new S3BackedInputStreamImpl(s3client, bucket, key,
+                                                                          versionCheck,
                                                                           index.getDataObjectLength(),
                                                                           readBufferSize);
             return new S3BackedReadHandleImpl(ledgerId, index, inputStream, executor);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3BackedInputStreamTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3BackedInputStreamTest.java
index 1155c2be9d..9f06ee8674 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3BackedInputStreamTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3BackedInputStreamTest.java
@@ -91,7 +91,9 @@ public void testReadingFullObject() throws Exception {
         metadata.setContentLength(objectSize);
         s3client.putObject(BUCKET, objectKey, toWrite, metadata);
 
-        S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, objectSize, 1000);
+        S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey,
+                                                                 (key, md) -> {},
+                                                                 objectSize, 1000);
         assertStreamsMatch(toTest, toCompare);
     }
 
@@ -106,13 +108,17 @@ public void testReadingFullObjectByBytes() throws Exception {
         metadata.setContentLength(objectSize);
         s3client.putObject(BUCKET, objectKey, toWrite, metadata);
 
-        S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, objectSize, 1000);
+        S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey,
+                                                                 (key, md) -> {},
+                                                                 objectSize, 1000);
         assertStreamsMatchByBytes(toTest, toCompare);
     }
 
     @Test(expectedExceptions = IOException.class)
     public void testErrorOnS3Read() throws Exception {
-        S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, "doesn't exist", 1234, 1000);
+        S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, "doesn't exist",
+                                                                 (key, md) -> {},
+                                                                 1234, 1000);
         toTest.read();
     }
 
@@ -136,7 +142,9 @@ public void testSeek() throws Exception {
         metadata.setContentLength(objectSize);
         s3client.putObject(BUCKET, objectKey, toWrite, metadata);
 
-        S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, objectSize, 1000);
+        S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey,
+                                                                 (key, md) -> {},
+                                                                 objectSize, 1000);
         for (Map.Entry<Integer, InputStream> e : seeks.entrySet()) {
             toTest.seek(e.getKey());
             assertStreamsMatch(toTest, e.getValue());
@@ -153,7 +161,9 @@ public void testSeekForward() throws Exception {
         metadata.setContentLength(objectSize);
         s3client.putObject(BUCKET, objectKey, toWrite, metadata);
 
-        S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, objectSize, 1000);
+        S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey,
+                                                                 (key, md) -> {},
+                                                                 objectSize, 1000);
 
         // seek forward to middle
         long middle = objectSize/2;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
index 1b5ad0399b..ab78f07eb2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
@@ -25,8 +25,11 @@
 
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.lang.reflect.Method;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Random;
@@ -447,5 +450,71 @@ public void testOffloadEmpty() throws Exception {
             Assert.assertEquals(e.getCause().getClass(), IllegalArgumentException.class);
         }
     }
+
+    @Test
+    public void testReadUnknownDataVersion() throws Exception {
+        ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
+        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler,
+                                                                 DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
+        UUID uuid = UUID.randomUUID();
+        offloader.offload(toWrite, uuid, new HashMap<>()).get();
+
+        String dataKey = dataBlockOffloadKey(toWrite.getId(), uuid);
+        ObjectMetadata md = s3client.getObjectMetadata(BUCKET, dataKey);
+        md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(-12345));
+        s3client.copyObject(new CopyObjectRequest(BUCKET, dataKey, BUCKET, dataKey).withNewObjectMetadata(md));
+
+        try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), uuid).get()) {
+            toRead.readAsync(0, 0).get();
+            Assert.fail("Shouldn't have been able to read");
+        } catch (ExecutionException e) {
+            Assert.assertEquals(e.getCause().getClass(), IOException.class);
+            Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version"));
+        }
+
+        md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(12345));
+        s3client.copyObject(new CopyObjectRequest(BUCKET, dataKey, BUCKET, dataKey).withNewObjectMetadata(md));
+
+        try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), uuid).get()) {
+            toRead.readAsync(0, 0).get();
+            Assert.fail("Shouldn't have been able to read");
+        } catch (ExecutionException e) {
+            Assert.assertEquals(e.getCause().getClass(), IOException.class);
+            Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version"));
+        }
+    }
+
+    @Test
+    public void testReadUnknownIndexVersion() throws Exception {
+        ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
+        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler,
+                                                                 DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
+        UUID uuid = UUID.randomUUID();
+        offloader.offload(toWrite, uuid, new HashMap<>()).get();
+
+        String indexKey = indexBlockOffloadKey(toWrite.getId(), uuid);
+        ObjectMetadata md = s3client.getObjectMetadata(BUCKET, indexKey);
+        md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(-12345));
+        s3client.copyObject(new CopyObjectRequest(BUCKET, indexKey, BUCKET, indexKey).withNewObjectMetadata(md));
+
+        try {
+            offloader.readOffloaded(toWrite.getId(), uuid).get();
+            Assert.fail("Shouldn't have been able to open");
+        } catch (ExecutionException e) {
+            Assert.assertEquals(e.getCause().getClass(), IOException.class);
+            Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version"));
+        }
+
+        md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(12345));
+        s3client.copyObject(new CopyObjectRequest(BUCKET, indexKey, BUCKET, indexKey).withNewObjectMetadata(md));
+
+        try {
+            offloader.readOffloaded(toWrite.getId(), uuid).get();
+            Assert.fail("Shouldn't have been able to open");
+        } catch (ExecutionException e) {
+            Assert.assertEquals(e.getCause().getClass(), IOException.class);
+            Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version"));
+        }
+    }
 }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services