You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by tf...@apache.org on 2023/07/17 20:37:32 UTC

[solr] branch branch_9x updated: SOLR-16886: Don't commit multi-part uploads that have been aborted (#1773)

This is an automated email from the ASF dual-hosted git repository.

tflobbe pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new 0f66a6bb0a8 SOLR-16886: Don't commit multi-part uploads that have been aborted (#1773)
0f66a6bb0a8 is described below

commit 0f66a6bb0a81342741b90bccccd10f04c47802dc
Author: Tomas Eduardo Fernandez Lobbe <tf...@apache.org>
AuthorDate: Mon Jul 17 13:29:45 2023 -0700

    SOLR-16886: Don't commit multi-part uploads that have been aborted (#1773)
---
 solr/CHANGES.txt                                   |   3 +-
 solr/modules/s3-repository/build.gradle            |   7 ++
 .../java/org/apache/solr/s3/S3OutputStream.java    |  21 +++-
 .../apache/solr/s3/S3OutputStreamMockitoTest.java  | 130 +++++++++++++++++++++
 4 files changed, 158 insertions(+), 3 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 43ff2797b11..f87a5802e56 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -21,7 +21,8 @@ Optimizations
 
 Bug Fixes
 ---------------------
-(No changes)
+
+* SOLR-16886: Don't commit multi-part uploads that have been aborted (Tomás Fernández Löbbe, Houston Putman)
 
 Dependency Upgrades
 ---------------------
diff --git a/solr/modules/s3-repository/build.gradle b/solr/modules/s3-repository/build.gradle
index 8fad1158a37..a22ca5dc7f8 100644
--- a/solr/modules/s3-repository/build.gradle
+++ b/solr/modules/s3-repository/build.gradle
@@ -79,6 +79,13 @@ dependencies {
   testImplementation 'commons-io:commons-io'
 
   testRuntimeOnly 'org.eclipse.jetty:jetty-webapp'
+
+  testImplementation('org.mockito:mockito-core', {
+    exclude group: "net.bytebuddy", module: "byte-buddy-agent"
+  })
+  testRuntimeOnly('org.mockito:mockito-subclass', {
+    exclude group: "net.bytebuddy", module: "byte-buddy-agent"
+  })
 }
 
 test {
diff --git a/solr/modules/s3-repository/src/java/org/apache/solr/s3/S3OutputStream.java b/solr/modules/s3-repository/src/java/org/apache/solr/s3/S3OutputStream.java
index 9a46f45101e..25bf3465e7f 100644
--- a/solr/modules/s3-repository/src/java/org/apache/solr/s3/S3OutputStream.java
+++ b/solr/modules/s3-repository/src/java/org/apache/solr/s3/S3OutputStream.java
@@ -163,14 +163,19 @@ public class S3OutputStream extends OutputStream {
       return;
     }
 
+    if (multiPartUpload != null && multiPartUpload.aborted) {
+      multiPartUpload = null;
+      closed = true;
+      return;
+    }
+
     // flush first
     uploadPart();
 
     if (multiPartUpload != null) {
       multiPartUpload.complete();
-      multiPartUpload = null;
     }
-
+    multiPartUpload = null;
     closed = true;
   }
 
@@ -186,6 +191,7 @@ public class S3OutputStream extends OutputStream {
   private class MultipartUpload {
     private final String uploadId;
     private final List<CompletedPart> completedParts;
+    private boolean aborted = false;
 
     public MultipartUpload(String uploadId) {
       this.uploadId = uploadId;
@@ -200,6 +206,10 @@ public class S3OutputStream extends OutputStream {
     }
 
     void uploadPart(ByteArrayInputStream inputStream, long partSize) {
+      if (aborted) {
+        throw new IllegalStateException(
+            "Can't upload new parts on a MultipartUpload that was aborted. id '" + uploadId + "'");
+      }
       int currentPartNumber = completedParts.size() + 1;
 
       UploadPartRequest request =
@@ -221,6 +231,10 @@ public class S3OutputStream extends OutputStream {
 
     /** To be invoked when closing the stream to mark upload is done. */
     void complete() {
+      if (aborted) {
+        throw new IllegalStateException(
+            "Can't complete a MultipartUpload that was aborted. id '" + uploadId + "'");
+      }
       if (log.isDebugEnabled()) {
         log.debug("Completing multi-part upload for key '{}', id '{}'", key, uploadId);
       }
@@ -242,6 +256,9 @@ public class S3OutputStream extends OutputStream {
         // ignoring failure on abort.
         log.error("Unable to abort multipart upload, you may need to purge uploaded parts: ", e);
       }
+      // Even if the abort operation failed, we consider this MultiPartUpload aborted,
+      // and we'll not try to complete it.
+      aborted = true;
     }
   }
 }
diff --git a/solr/modules/s3-repository/src/test/org/apache/solr/s3/S3OutputStreamMockitoTest.java b/solr/modules/s3-repository/src/test/org/apache/solr/s3/S3OutputStreamMockitoTest.java
new file mode 100644
index 00000000000..f0bcd196975
--- /dev/null
+++ b/solr/modules/s3-repository/src/test/org/apache/solr/s3/S3OutputStreamMockitoTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.s3;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
+
+import com.carrotsearch.randomizedtesting.generators.RandomStrings;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.function.Consumer;
+import org.apache.solr.SolrTestCaseJ4;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+import software.amazon.awssdk.services.s3.model.UploadPartRequest;
+import software.amazon.awssdk.services.s3.model.UploadPartResponse;
+
+public class S3OutputStreamMockitoTest extends SolrTestCaseJ4 {
+
+  private S3Client clientMock;
+
+  private static byte[] largeBuffer;
+
+  @BeforeClass
+  public static void setUpClass() {
+    assumeWorkingMockito();
+    String content =
+        RandomStrings.randomAsciiAlphanumOfLength(random(), S3OutputStream.PART_SIZE + 1024);
+    largeBuffer = content.getBytes(StandardCharsets.UTF_8);
+    // pre-check -- ensure that our test string isn't too small
+    assertTrue(largeBuffer.length > S3OutputStream.PART_SIZE);
+  }
+
+  @AfterClass
+  public static void tearDownClass() {
+    largeBuffer = null;
+  }
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    clientMock = mock(S3Client.class);
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testMultiPartUploadCompleted() throws IOException {
+    when(clientMock.createMultipartUpload((Consumer<CreateMultipartUploadRequest.Builder>) any()))
+        .thenReturn(CreateMultipartUploadResponse.builder().build());
+    when(clientMock.uploadPart((UploadPartRequest) any(), (RequestBody) any()))
+        .thenReturn(UploadPartResponse.builder().build());
+    S3OutputStream stream = new S3OutputStream(clientMock, "key", "bucket");
+    stream.write(largeBuffer);
+    verify(clientMock)
+        .createMultipartUpload((Consumer<CreateMultipartUploadRequest.Builder>) any());
+    verify(clientMock).uploadPart((UploadPartRequest) any(), (RequestBody) any());
+    verify(clientMock, never())
+        .completeMultipartUpload((Consumer<CompleteMultipartUploadRequest.Builder>) any());
+    verify(clientMock, never())
+        .abortMultipartUpload((Consumer<AbortMultipartUploadRequest.Builder>) any());
+
+    stream.close();
+    verify(clientMock)
+        .completeMultipartUpload((Consumer<CompleteMultipartUploadRequest.Builder>) any());
+    verify(clientMock, never())
+        .abortMultipartUpload((Consumer<AbortMultipartUploadRequest.Builder>) any());
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testMultiPartUploadAborted() throws IOException {
+    when(clientMock.createMultipartUpload((Consumer<CreateMultipartUploadRequest.Builder>) any()))
+        .thenReturn(CreateMultipartUploadResponse.builder().build());
+    when(clientMock.uploadPart((UploadPartRequest) any(), (RequestBody) any()))
+        .thenThrow(S3Exception.builder().message("fake exception").build());
+    S3OutputStream stream = new S3OutputStream(clientMock, "key", "bucket");
+    // first time it should throw the exception from S3Client
+    org.apache.solr.s3.S3Exception solrS3Exception =
+        assertThrows(org.apache.solr.s3.S3Exception.class, () -> stream.write(largeBuffer));
+    assertEquals(S3Exception.class, solrS3Exception.getCause().getClass());
+    assertEquals("fake exception", solrS3Exception.getCause().getMessage());
+    verify(clientMock).abortMultipartUpload((Consumer<AbortMultipartUploadRequest.Builder>) any());
+
+    // after that, the exception should be because the MPU is aborted
+    solrS3Exception =
+        assertThrows(org.apache.solr.s3.S3Exception.class, () -> stream.write(largeBuffer));
+    assertEquals(IllegalStateException.class, solrS3Exception.getCause().getClass());
+    assertTrue(
+        "Unexpected exception message: " + solrS3Exception.getCause().getMessage(),
+        solrS3Exception
+            .getCause()
+            .getMessage()
+            .contains("Can't upload new parts on a MultipartUpload that was aborted"));
+
+    verify(clientMock)
+        .createMultipartUpload((Consumer<CreateMultipartUploadRequest.Builder>) any());
+    verify(clientMock).uploadPart((UploadPartRequest) any(), (RequestBody) any());
+    verify(clientMock, never())
+        .completeMultipartUpload((Consumer<CompleteMultipartUploadRequest.Builder>) any());
+    verify(clientMock, times(2))
+        .abortMultipartUpload((Consumer<AbortMultipartUploadRequest.Builder>) any());
+
+    stream.close();
+    verify(clientMock, never())
+        .completeMultipartUpload((Consumer<CompleteMultipartUploadRequest.Builder>) any());
+  }
+}