You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/31 17:51:25 UTC

[incubator-pulsar] branch master updated: When offloading to S3 add version information (#1876)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 77d936e  When offloading to S3 add version information (#1876)
77d936e is described below

commit 77d936e65b1bf6a8f6fc52e1a51670851fe90e20
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Thu May 31 19:51:22 2018 +0200

    When offloading to S3 add version information (#1876)
    
    When we offload ledgers to S3, we create 2 objects, an index and a
    data object. This patch adds version information for the format, the
    version of the software and the gitsha of the software to each, for
    debugging purposes.
    
    On reads, it the format version is a version higher than the broker
    knows about the read is rejected.
    
    Master Issue: #1511
---
 .../broker/s3offload/S3ManagedLedgerOffloader.java | 36 ++++++++++-
 .../s3offload/impl/S3BackedInputStreamImpl.java    |  7 +++
 .../s3offload/impl/S3BackedReadHandleImpl.java     |  5 ++
 .../broker/s3offload/S3BackedInputStreamTest.java  | 20 +++++--
 .../s3offload/S3ManagedLedgerOffloaderTest.java    | 69 ++++++++++++++++++++++
 5 files changed, 131 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
index dcfa9e8..73bb78a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
@@ -33,6 +33,7 @@ import com.amazonaws.services.s3.model.UploadPartRequest;
 import com.amazonaws.services.s3.model.UploadPartResult;
 import com.google.common.base.Strings;
 import java.io.InputStream;
+import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -45,6 +46,7 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl;
 import org.apache.pulsar.broker.s3offload.impl.S3BackedReadHandleImpl;
+import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,6 +54,20 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
     private static final Logger log = LoggerFactory.getLogger(S3ManagedLedgerOffloader.class);
 
     public static final String DRIVER_NAME = "S3";
+
+    static final String METADATA_FORMAT_VERSION_KEY = "S3ManagedLedgerOffloaderFormatVersion";
+    static final String METADATA_SOFTWARE_VERSION_KEY = "S3ManagedLedgerOffloaderSoftwareVersion";
+    static final String METADATA_SOFTWARE_GITSHA_KEY = "S3ManagedLedgerOffloaderSoftwareGitSha";
+    static final String CURRENT_VERSION = String.valueOf(1);
+
+    private final VersionCheck VERSION_CHECK = (key, metadata) -> {
+        String version = metadata.getUserMetadata().get(METADATA_FORMAT_VERSION_KEY);
+        if (version == null || !version.equals(CURRENT_VERSION)) {
+            throw new IOException(String.format("Invalid object version %s for %s, expect %s",
+                                                version, key, CURRENT_VERSION));
+        }
+    };
+
     private final OrderedScheduler scheduler;
     private final AmazonS3 s3client;
     private final String bucket;
@@ -124,7 +140,11 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
                 .withDataBlockHeaderLength(BlockAwareSegmentInputStreamImpl.getHeaderSize());
             String dataBlockKey = dataBlockOffloadKey(readHandle.getId(), uuid);
             String indexBlockKey = indexBlockOffloadKey(readHandle.getId(), uuid);
-            InitiateMultipartUploadRequest dataBlockReq = new InitiateMultipartUploadRequest(bucket, dataBlockKey, new ObjectMetadata());
+
+            ObjectMetadata dataMetadata = new ObjectMetadata();
+            addVersionInfo(dataMetadata);
+
+            InitiateMultipartUploadRequest dataBlockReq = new InitiateMultipartUploadRequest(bucket, dataBlockKey, dataMetadata);
             InitiateMultipartUploadResult dataBlockRes = null;
 
             // init multi part upload for data block.
@@ -195,6 +215,8 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
                 // write the index block
                 ObjectMetadata metadata = new ObjectMetadata();
                 metadata.setContentLength(indexStream.getStreamSize());
+                addVersionInfo(metadata);
+
                 s3client.putObject(new PutObjectRequest(
                     bucket,
                     indexBlockKey,
@@ -225,6 +247,7 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
                     promise.complete(S3BackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
                                                                  s3client,
                                                                  bucket, key, indexKey,
+                                                                 VERSION_CHECK,
                                                                  ledgerId, readBufferSize));
                 } catch (Throwable t) {
                     promise.completeExceptionally(t);
@@ -233,6 +256,13 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
         return promise;
     }
 
+    private static void addVersionInfo(ObjectMetadata metadata) {
+        metadata.getUserMetadata().put(METADATA_FORMAT_VERSION_KEY, CURRENT_VERSION);
+        metadata.getUserMetadata().put(METADATA_SOFTWARE_VERSION_KEY,
+                                       PulsarBrokerVersionStringUtils.getNormalizedVersionString());
+        metadata.getUserMetadata().put(METADATA_SOFTWARE_GITSHA_KEY, PulsarBrokerVersionStringUtils.getGitSha());
+    }
+
     @Override
     public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
@@ -249,6 +279,10 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
 
         return promise;
     }
+
+    public interface VersionCheck {
+        void check(String key, ObjectMetadata md) throws IOException;
+    }
 }
 
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedInputStreamImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedInputStreamImpl.java
index 0c5e3df..912a1d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedInputStreamImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedInputStreamImpl.java
@@ -30,6 +30,8 @@ import java.io.InputStream;
 import java.io.IOException;
 
 import org.apache.pulsar.broker.s3offload.S3BackedInputStream;
+import org.apache.pulsar.broker.s3offload.S3ManagedLedgerOffloader.VersionCheck;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +41,7 @@ public class S3BackedInputStreamImpl extends S3BackedInputStream {
     private final AmazonS3 s3client;
     private final String bucket;
     private final String key;
+    private final VersionCheck versionCheck;
     private final ByteBuf buffer;
     private final long objectLen;
     private final int bufferSize;
@@ -46,10 +49,12 @@ public class S3BackedInputStreamImpl extends S3BackedInputStream {
     private long cursor;
 
     public S3BackedInputStreamImpl(AmazonS3 s3client, String bucket, String key,
+                                   VersionCheck versionCheck,
                                    long objectLen, int bufferSize) {
         this.s3client = s3client;
         this.bucket = bucket;
         this.key = key;
+        this.versionCheck = versionCheck;
         this.buffer = PooledByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize);
         this.objectLen = objectLen;
         this.bufferSize = bufferSize;
@@ -72,6 +77,8 @@ public class S3BackedInputStreamImpl extends S3BackedInputStream {
                 .withRange(startRange, endRange);
             log.debug("Reading range {}-{} from {}/{}", startRange, endRange, bucket, key);
             try (S3Object obj = s3client.getObject(req)) {
+                versionCheck.check(key, obj.getObjectMetadata());
+
                 Long[] range = obj.getObjectMetadata().getContentRange();
                 long bytesRead = range[1] - range[0] + 1;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java
index 984af59..65acbb8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.broker.s3offload.OffloadIndexBlock;
 import org.apache.pulsar.broker.s3offload.OffloadIndexBlockBuilder;
 import org.apache.pulsar.broker.s3offload.OffloadIndexEntry;
 import org.apache.pulsar.broker.s3offload.S3BackedInputStream;
+import org.apache.pulsar.broker.s3offload.S3ManagedLedgerOffloader.VersionCheck;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -190,14 +191,18 @@ public class S3BackedReadHandleImpl implements ReadHandle {
 
     public static ReadHandle open(ScheduledExecutorService executor,
                                   AmazonS3 s3client, String bucket, String key, String indexKey,
+                                  VersionCheck versionCheck,
                                   long ledgerId, int readBufferSize)
             throws AmazonClientException, IOException {
         GetObjectRequest req = new GetObjectRequest(bucket, indexKey);
         try (S3Object obj = s3client.getObject(req)) {
+            versionCheck.check(indexKey, obj.getObjectMetadata());
+
             OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create();
             OffloadIndexBlock index = indexBuilder.fromStream(obj.getObjectContent());
 
             S3BackedInputStream inputStream = new S3BackedInputStreamImpl(s3client, bucket, key,
+                                                                          versionCheck,
                                                                           index.getDataObjectLength(),
                                                                           readBufferSize);
             return new S3BackedReadHandleImpl(ledgerId, index, inputStream, executor);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3BackedInputStreamTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3BackedInputStreamTest.java
index 1155c2b..9f06ee8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3BackedInputStreamTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3BackedInputStreamTest.java
@@ -91,7 +91,9 @@ class S3BackedInputStreamTest extends S3TestBase {
         metadata.setContentLength(objectSize);
         s3client.putObject(BUCKET, objectKey, toWrite, metadata);
 
-        S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, objectSize, 1000);
+        S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey,
+                                                                 (key, md) -> {},
+                                                                 objectSize, 1000);
         assertStreamsMatch(toTest, toCompare);
     }
 
@@ -106,13 +108,17 @@ class S3BackedInputStreamTest extends S3TestBase {
         metadata.setContentLength(objectSize);
         s3client.putObject(BUCKET, objectKey, toWrite, metadata);
 
-        S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, objectSize, 1000);
+        S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey,
+                                                                 (key, md) -> {},
+                                                                 objectSize, 1000);
         assertStreamsMatchByBytes(toTest, toCompare);
     }
 
     @Test(expectedExceptions = IOException.class)
     public void testErrorOnS3Read() throws Exception {
-        S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, "doesn't exist", 1234, 1000);
+        S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, "doesn't exist",
+                                                                 (key, md) -> {},
+                                                                 1234, 1000);
         toTest.read();
     }
 
@@ -136,7 +142,9 @@ class S3BackedInputStreamTest extends S3TestBase {
         metadata.setContentLength(objectSize);
         s3client.putObject(BUCKET, objectKey, toWrite, metadata);
 
-        S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, objectSize, 1000);
+        S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey,
+                                                                 (key, md) -> {},
+                                                                 objectSize, 1000);
         for (Map.Entry<Integer, InputStream> e : seeks.entrySet()) {
             toTest.seek(e.getKey());
             assertStreamsMatch(toTest, e.getValue());
@@ -153,7 +161,9 @@ class S3BackedInputStreamTest extends S3TestBase {
         metadata.setContentLength(objectSize);
         s3client.putObject(BUCKET, objectKey, toWrite, metadata);
 
-        S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, objectSize, 1000);
+        S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey,
+                                                                 (key, md) -> {},
+                                                                 objectSize, 1000);
 
         // seek forward to middle
         long middle = objectSize/2;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
index 1b5ad03..ab78f07 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
@@ -25,8 +25,11 @@ import static org.mockito.Matchers.anyLong;
 
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.lang.reflect.Method;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Random;
@@ -447,5 +450,71 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
             Assert.assertEquals(e.getCause().getClass(), IllegalArgumentException.class);
         }
     }
+
+    @Test
+    public void testReadUnknownDataVersion() throws Exception {
+        ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
+        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler,
+                                                                 DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
+        UUID uuid = UUID.randomUUID();
+        offloader.offload(toWrite, uuid, new HashMap<>()).get();
+
+        String dataKey = dataBlockOffloadKey(toWrite.getId(), uuid);
+        ObjectMetadata md = s3client.getObjectMetadata(BUCKET, dataKey);
+        md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(-12345));
+        s3client.copyObject(new CopyObjectRequest(BUCKET, dataKey, BUCKET, dataKey).withNewObjectMetadata(md));
+
+        try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), uuid).get()) {
+            toRead.readAsync(0, 0).get();
+            Assert.fail("Shouldn't have been able to read");
+        } catch (ExecutionException e) {
+            Assert.assertEquals(e.getCause().getClass(), IOException.class);
+            Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version"));
+        }
+
+        md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(12345));
+        s3client.copyObject(new CopyObjectRequest(BUCKET, dataKey, BUCKET, dataKey).withNewObjectMetadata(md));
+
+        try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), uuid).get()) {
+            toRead.readAsync(0, 0).get();
+            Assert.fail("Shouldn't have been able to read");
+        } catch (ExecutionException e) {
+            Assert.assertEquals(e.getCause().getClass(), IOException.class);
+            Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version"));
+        }
+    }
+
+    @Test
+    public void testReadUnknownIndexVersion() throws Exception {
+        ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
+        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler,
+                                                                 DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
+        UUID uuid = UUID.randomUUID();
+        offloader.offload(toWrite, uuid, new HashMap<>()).get();
+
+        String indexKey = indexBlockOffloadKey(toWrite.getId(), uuid);
+        ObjectMetadata md = s3client.getObjectMetadata(BUCKET, indexKey);
+        md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(-12345));
+        s3client.copyObject(new CopyObjectRequest(BUCKET, indexKey, BUCKET, indexKey).withNewObjectMetadata(md));
+
+        try {
+            offloader.readOffloaded(toWrite.getId(), uuid).get();
+            Assert.fail("Shouldn't have been able to open");
+        } catch (ExecutionException e) {
+            Assert.assertEquals(e.getCause().getClass(), IOException.class);
+            Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version"));
+        }
+
+        md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(12345));
+        s3client.copyObject(new CopyObjectRequest(BUCKET, indexKey, BUCKET, indexKey).withNewObjectMetadata(md));
+
+        try {
+            offloader.readOffloaded(toWrite.getId(), uuid).get();
+            Assert.fail("Shouldn't have been able to open");
+        } catch (ExecutionException e) {
+            Assert.assertEquals(e.getCause().getClass(), IOException.class);
+            Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version"));
+        }
+    }
 }
 

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.