You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2019/03/18 13:19:21 UTC

[hadoop] branch trunk updated: HADOOP-16182. Update abfs storage back-end with "close" flag when application is done writing to a file.

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1edf191  HADOOP-16182. Update abfs storage back-end with "close" flag when application is done writing to a file.
1edf191 is described below

commit 1edf1914acb74e45f6717c703f519cb382aae173
Author: Vishwajeet Dusane <vd...@microsoft.com>
AuthorDate: Mon Mar 18 13:18:08 2019 +0000

    HADOOP-16182. Update abfs storage back-end with "close" flag when application is done writing to a file.
    
    Contributed by Vishwajeet Dusane.
---
 .../fs/azurebfs/constants/HttpQueryParams.java      |  1 +
 .../hadoop/fs/azurebfs/services/AbfsClient.java     |  3 ++-
 .../fs/azurebfs/services/AbfsOutputStream.java      | 21 +++++++++++----------
 3 files changed, 14 insertions(+), 11 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
index 2f2f546..87e074f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
@@ -35,6 +35,7 @@ public final class HttpQueryParams {
   public static final String QUERY_PARAM_POSITION = "position";
   public static final String QUERY_PARAM_TIMEOUT = "timeout";
   public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData";
+  public static final String QUERY_PARAM_CLOSE = "close";
   public static final String QUERY_PARAM_UPN = "upn";
 
   private HttpQueryParams() {}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index c634c35..c29543f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -300,7 +300,7 @@ public class AbfsClient implements Closeable {
     return op;
   }
 
-  public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData)
+  public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData, boolean isClose)
       throws AzureBlobFileSystemException {
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
     // JDK7 does not support PATCH, so to workaround the issue we will use
@@ -312,6 +312,7 @@ public class AbfsClient implements Closeable {
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION);
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_RETAIN_UNCOMMITTED_DATA, String.valueOf(retainUncommittedData));
+    abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose));
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = new AbfsRestOperation(
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 85db774..56fe0b1 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -200,7 +200,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
   @Override
   public void hsync() throws IOException {
     if (supportFlush) {
-      flushInternal();
+      flushInternal(false);
     }
   }
 
@@ -211,7 +211,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
   @Override
   public void hflush() throws IOException {
     if (supportFlush) {
-      flushInternal();
+      flushInternal(false);
     }
   }
 
@@ -230,7 +230,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     }
 
     try {
-      flushInternal();
+      flushInternal(true);
       threadExecutor.shutdown();
     } finally {
       lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
@@ -244,10 +244,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     }
   }
 
-  private synchronized void flushInternal() throws IOException {
+  private synchronized void flushInternal(boolean isClose) throws IOException {
     maybeThrowLastError();
     writeCurrentBufferToService();
-    flushWrittenBytesToService();
+    flushWrittenBytesToService(isClose);
   }
 
   private synchronized void flushInternalAsync() throws IOException {
@@ -288,7 +288,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     shrinkWriteOperationQueue();
   }
 
-  private synchronized void flushWrittenBytesToService() throws IOException {
+  private synchronized void flushWrittenBytesToService(boolean isClose) throws IOException {
     for (WriteOperation writeOperation : writeOperations) {
       try {
         writeOperation.task.get();
@@ -306,21 +306,22 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
         throw lastError;
       }
     }
-    flushWrittenBytesToServiceInternal(position, false);
+    flushWrittenBytesToServiceInternal(position, false, isClose);
   }
 
   private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
     shrinkWriteOperationQueue();
 
     if (this.lastTotalAppendOffset > this.lastFlushOffset) {
-      this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true);
+      this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true,
+        false/*Async flush on close not permitted*/);
     }
   }
 
   private synchronized void flushWrittenBytesToServiceInternal(final long offset,
-      final boolean retainUncommitedData) throws IOException {
+      final boolean retainUncommitedData, final boolean isClose) throws IOException {
     try {
-      client.flush(path, offset, retainUncommitedData);
+      client.flush(path, offset, retainUncommitedData, isClose);
     } catch (AzureBlobFileSystemException ex) {
       if (ex instanceof AbfsRestOperationException) {
         if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org