You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/12/22 06:15:41 UTC

[GitHub] [hadoop] pranavsaxena-microsoft commented on a diff in pull request #4039: HADOOP-18146: ABFS: Added changes for expect hundred continue header

pranavsaxena-microsoft commented on code in PR #4039:
URL: https://github.com/apache/hadoop/pull/4039#discussion_r1055102136


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java:
##########
@@ -681,35 +685,36 @@ public AbfsRestOperation append(final String path, final byte[] buffer,
         abfsUriQueryBuilder, cachedSasToken);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
-    final AbfsRestOperation op = new AbfsRestOperation(
-        AbfsRestOperationType.Append,
-        this,
-        HTTP_METHOD_PUT,
-        url,
-        requestHeaders,
-        buffer,
-        reqParams.getoffset(),
-        reqParams.getLength(),
-        sasTokenForReuse);
+    final AbfsRestOperation op = getAbfsRestOperationForAppend(AbfsRestOperationType.Append,
+        HTTP_METHOD_PUT, url, requestHeaders, buffer, reqParams.getoffset(),
+        reqParams.getLength(), sasTokenForReuse);
     try {
       op.execute(tracingContext);
     } catch (AzureBlobFileSystemException e) {
+      // If the http response code indicates a user error we retry
+      // the same append request with expect header disabled.
+      // When "100-continue" header is enabled but a non Http 100 response comes,
+      // JDK fails to provide all response headers.

Review Comment:
   Kindly elaborate / cite-resource for `JDK fails to provide all response headers.`



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java:
##########
@@ -314,13 +327,46 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio
     if (this.isTraceEnabled) {
       startTime = System.nanoTime();
     }
-    try (OutputStream outputStream = this.connection.getOutputStream()) {
-      // update bytes sent before they are sent so we may observe
-      // attempted sends as well as successful sends via the
-      // accompanying statusCode
+    OutputStream outputStream = null;
+    // Updates the expected bytes to be sent based on length.
+    this.expectedBytesToBeSent = length;
+    try {
+      try {
+        /* Without expect header enabled, if getOutputStream() throws
+           an exception, it gets caught by the restOperation. But with
+           expect header enabled we return back without throwing an exception
+           for the correct response code processing.
+         */
+        outputStream = this.connection.getOutputStream();
+      } catch (IOException e) {
+        /* If getOutputStream fails with an exception and expect header
+           is enabled, we return back without throwing an exception to
+           the caller. The caller is responsible for setting the correct status code.
+           If expect header is not enabled, we throw back the exception.
+         */
+        String expectHeader = this.connection.getRequestProperty(EXPECT);
+        if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)) {
+          LOG.debug("Getting output stream failed with expect header enabled, returning back "
+                  + ExceptionUtils.getStackTrace(e));

Review Comment:
   Log.debug("message", exceptionObj) will do the same thing of printing the stacktrace.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -229,14 +229,23 @@ private void completeExecute(TracingContext tracingContext)
       }
     }
 
-    if (result.getStatusCode() >= HttpURLConnection.HTTP_BAD_REQUEST) {

Review Comment:
   Please add comment on why exception is being thrown for status < 200.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java:
##########
@@ -314,13 +327,46 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio
     if (this.isTraceEnabled) {
       startTime = System.nanoTime();
     }
-    try (OutputStream outputStream = this.connection.getOutputStream()) {
-      // update bytes sent before they are sent so we may observe
-      // attempted sends as well as successful sends via the
-      // accompanying statusCode
+    OutputStream outputStream = null;
+    // Updates the expected bytes to be sent based on length.
+    this.expectedBytesToBeSent = length;
+    try {
+      try {
+        /* Without expect header enabled, if getOutputStream() throws
+           an exception, it gets caught by the restOperation. But with
+           expect header enabled we return back without throwing an exception
+           for the correct response code processing.
+         */
+        outputStream = this.connection.getOutputStream();
+      } catch (IOException e) {
+        /* If getOutputStream fails with an exception and expect header
+           is enabled, we return back without throwing an exception to
+           the caller. The caller is responsible for setting the correct status code.
+           If expect header is not enabled, we throw back the exception.
+         */
+        String expectHeader = this.connection.getRequestProperty(EXPECT);
+        if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)) {
+          LOG.debug("Getting output stream failed with expect header enabled, returning back "
+                  + ExceptionUtils.getStackTrace(e));
+          return;

Review Comment:
   After sendRequest method gets completed, AbfsRestOperation will trigger httpOperation.processResponse. If we have not send the request, what are we expecting from the connection.getInputStream, connection.getResponseCode, connection.getHeaderField.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java:
##########
@@ -315,12 +324,20 @@ private boolean executeHttpOperation(final int retryCount,
       }
 
       if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) {
-        throw new InvalidAbfsRestOperationException(ex);
+        throw new InvalidAbfsRestOperationException(ex, retryCount);
       }
 
       return false;
     } finally {
-      intercept.updateMetrics(operationType, httpOperation);
+      int status = httpOperation.getStatusCode();
+      // If the socket is terminated prior to receiving a response, the HTTP

Review Comment:
   Please cite the resource for http status being 0 or -1.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/InvalidAbfsRestOperationException.java:
##########
@@ -30,14 +30,33 @@
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class InvalidAbfsRestOperationException extends AbfsRestOperationException {
+
+  private static final String ERROR_MESSAGE = "InvalidAbfsRestOperationException";
+
   public InvalidAbfsRestOperationException(
       final Exception innerException) {
     super(
         AzureServiceErrorCode.UNKNOWN.getStatusCode(),
         AzureServiceErrorCode.UNKNOWN.getErrorCode(),
         innerException != null
             ? innerException.toString()
-            : "InvalidAbfsRestOperationException",
+            : ERROR_MESSAGE,

Review Comment:
   Should we have this.getClass().getName()?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java:
##########
@@ -314,13 +327,46 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio
     if (this.isTraceEnabled) {
       startTime = System.nanoTime();
     }
-    try (OutputStream outputStream = this.connection.getOutputStream()) {
-      // update bytes sent before they are sent so we may observe
-      // attempted sends as well as successful sends via the
-      // accompanying statusCode
+    OutputStream outputStream = null;
+    // Updates the expected bytes to be sent based on length.
+    this.expectedBytesToBeSent = length;
+    try {
+      try {
+        /* Without expect header enabled, if getOutputStream() throws
+           an exception, it gets caught by the restOperation. But with
+           expect header enabled we return back without throwing an exception
+           for the correct response code processing.
+         */
+        outputStream = this.connection.getOutputStream();
+      } catch (IOException e) {
+        /* If getOutputStream fails with an exception and expect header
+           is enabled, we return back without throwing an exception to
+           the caller. The caller is responsible for setting the correct status code.
+           If expect header is not enabled, we throw back the exception.
+         */
+        String expectHeader = this.connection.getRequestProperty(EXPECT);
+        if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)) {
+          LOG.debug("Getting output stream failed with expect header enabled, returning back "
+                  + ExceptionUtils.getStackTrace(e));
+          return;
+        } else {
+          LOG.debug("Getting output stream failed without expect header enabled, throwing exception "
+                  + ExceptionUtils.getStackTrace(e));

Review Comment:
   Log.debug("message", exceptionObj) will do the same thing of printing the stacktrace.



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java:
##########
@@ -404,4 +431,152 @@ public static AbfsRestOperation getRestOp(AbfsRestOperationType type,
   public static AccessTokenProvider getAccessTokenProvider(AbfsClient client) {
     return client.getTokenProvider();
   }
+
+  /**
+   * Test helper method to get random bytes array.
+   * @param length The length of byte buffer.
+   * @return byte buffer.
+   */
+  private byte[] getRandomBytesArray(int length) {
+    final byte[] b = new byte[length];
+    new Random().nextBytes(b);
+    return b;
+  }
+
+  /**
+   * Test to verify that client retries append request without
+   * expect header enabled if append with expect header enabled fails
+   * with 4xx kind of error.
+   * @throws Exception
+   */
+  @Test
+  public void testExpectHundredContinue() throws Exception {
+    // Get the filesystem.
+    final AzureBlobFileSystem fs = getFileSystem();
+
+    final Configuration configuration = new Configuration();
+    configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
+    AbfsClient abfsClient = fs.getAbfsStore().getClient();
+
+    AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
+        configuration.get(FS_AZURE_ABFS_ACCOUNT_NAME));
+
+    // Update the configuration with reduced retry count and reduced backoff interval.
+    AbfsConfiguration abfsConfig
+        = TestAbfsConfigurationFieldsValidation.updateRetryConfigs(
+        abfsConfiguration,
+        REDUCED_RETRY_COUNT, REDUCED_BACKOFF_INTERVAL);
+
+    // Gets the client.
+    AbfsClient testClient = Mockito.spy(
+        TestAbfsClient.createTestClientFromCurrentContext(
+            abfsClient,
+            abfsConfig));
+
+    // Create the append request params with expect header enabled initially.
+    AppendRequestParameters appendRequestParameters
+        = new AppendRequestParameters(
+        BUFFER_OFFSET, BUFFER_OFFSET, BUFFER_LENGTH,
+        AppendRequestParameters.Mode.APPEND_MODE, false, null, true);
+
+    byte[] buffer = getRandomBytesArray(BUFFER_LENGTH);
+
+    // Create a test container to upload the data.
+    Path testPath = path(TEST_PATH);
+    fs.create(testPath);
+    String finalTestPath = testPath.toString()
+        .substring(testPath.toString().lastIndexOf("/"));
+
+    // Creates a list of request headers.
+    final List<AbfsHttpHeader> requestHeaders
+        = TestAbfsClient.getTestRequestHeaders(testClient);
+    requestHeaders.add(
+        new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH));
+    if (appendRequestParameters.isExpectHeaderEnabled()) {
+      requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
+    }
+
+    // Updates the query parameters.
+    final AbfsUriQueryBuilder abfsUriQueryBuilder
+        = testClient.createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
+    abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION,
+        Long.toString(appendRequestParameters.getPosition()));
+
+    // Creates the url for the specified path.
+    URL url = testClient.createRequestUrl(finalTestPath, abfsUriQueryBuilder.toString());
+
+    // Create a mock of the AbfsRestOperation to set the urlConnection in the corresponding httpOperation.
+    AbfsRestOperation op = Mockito.spy(new AbfsRestOperation(
+        AbfsRestOperationType.Append,
+        testClient,
+        HTTP_METHOD_PUT,
+        url,
+        requestHeaders, buffer,
+        appendRequestParameters.getoffset(),
+        appendRequestParameters.getLength(), null));
+
+    AbfsHttpOperation abfsHttpOperation = new AbfsHttpOperation(url,
+        HTTP_METHOD_PUT, requestHeaders);
+
+    // Create a mock of UrlConnection class.
+    HttpURLConnection urlConnection = mock(HttpURLConnection.class);
+
+    // Sets the expect request property if expect header is enabled.
+    if (appendRequestParameters.isExpectHeaderEnabled()) {
+      Mockito.doReturn(HUNDRED_CONTINUE).when(urlConnection)
+          .getRequestProperty(EXPECT);
+    }
+    Mockito.doNothing().when(urlConnection).setRequestProperty(Mockito
+        .any(), Mockito.any());
+    Mockito.doReturn(url).when(urlConnection).getURL();
+
+    // Give user error code 404 when processResponse is called.
+    Mockito.doReturn(HTTP_METHOD_PUT).when(urlConnection).getRequestMethod();
+    Mockito.doReturn(HTTP_NOT_FOUND).when(urlConnection).getResponseCode();
+    Mockito.doReturn("Resource Not Found")
+        .when(urlConnection)
+        .getResponseMessage();
+
+    // Make the getOutputStream throw IOException to see it returns from the sendRequest correctly.
+    Mockito.doThrow(new ProtocolException("Server rejected Operation"))
+        .when(urlConnection)
+        .getOutputStream();
+    abfsHttpOperation.setConnection(urlConnection);

Review Comment:
   Lets not have a setter for connection in abfsHttpOperation. What we can do is:
   
   1. Make abfsHttpOperation a spied obj.
   2. Have package-protected methods in AbfsHttpOperation for getConnectionRequestMethod(), getConnectionResponseCode(), getConnectionResponseMessage() which in the obj would be calling connection method. But the spied object would have the mocked behaviour :
    - getConnectionResponseCode: return HTTP_NOT_FOUND.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java:
##########
@@ -110,6 +115,10 @@ protected  HttpURLConnection getConnection() {
     return connection;
   }
 
+  void setConnection(final HttpURLConnection connection) {
+    this.connection = connection;
+  }
+

Review Comment:
   As suggested in TestAbfsClient, lets remove it.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java:
##########
@@ -126,7 +132,7 @@ public ExponentialRetryPolicy(final int retryCount, final int minBackoff, final
    */
   public boolean shouldRetry(final int retryCount, final int statusCode) {
     return retryCount < this.retryCount
-        && (statusCode == -1
+        && (statusCode < HTTP_CONTINUE

Review Comment:
   Please cite reference on why we are retrying on range of statuses.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org