You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2019/03/18 13:19:21 UTC
[hadoop] branch trunk updated: HADOOP-16182. Update abfs storage
back-end with "close" flag when application is done writing to a file.
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1edf191 HADOOP-16182. Update abfs storage back-end with "close" flag when application is done writing to a file.
1edf191 is described below
commit 1edf1914acb74e45f6717c703f519cb382aae173
Author: Vishwajeet Dusane <vd...@microsoft.com>
AuthorDate: Mon Mar 18 13:18:08 2019 +0000
HADOOP-16182. Update abfs storage back-end with "close" flag when application is done writing to a file.
Contributed by Vishwajeet Dusane.
---
.../fs/azurebfs/constants/HttpQueryParams.java | 1 +
.../hadoop/fs/azurebfs/services/AbfsClient.java | 3 ++-
.../fs/azurebfs/services/AbfsOutputStream.java | 21 +++++++++++----------
3 files changed, 14 insertions(+), 11 deletions(-)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
index 2f2f546..87e074f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java
@@ -35,6 +35,7 @@ public final class HttpQueryParams {
public static final String QUERY_PARAM_POSITION = "position";
public static final String QUERY_PARAM_TIMEOUT = "timeout";
public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData";
+ public static final String QUERY_PARAM_CLOSE = "close";
public static final String QUERY_PARAM_UPN = "upn";
private HttpQueryParams() {}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index c634c35..c29543f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -300,7 +300,7 @@ public class AbfsClient implements Closeable {
return op;
}
- public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData)
+ public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData, boolean isClose)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
// JDK7 does not support PATCH, so to workaround the issue we will use
@@ -312,6 +312,7 @@ public class AbfsClient implements Closeable {
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RETAIN_UNCOMMITTED_DATA, String.valueOf(retainUncommittedData));
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose));
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 85db774..56fe0b1 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -200,7 +200,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
@Override
public void hsync() throws IOException {
if (supportFlush) {
- flushInternal();
+ flushInternal(false);
}
}
@@ -211,7 +211,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
@Override
public void hflush() throws IOException {
if (supportFlush) {
- flushInternal();
+ flushInternal(false);
}
}
@@ -230,7 +230,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
}
try {
- flushInternal();
+ flushInternal(true);
threadExecutor.shutdown();
} finally {
lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
@@ -244,10 +244,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
}
}
- private synchronized void flushInternal() throws IOException {
+ private synchronized void flushInternal(boolean isClose) throws IOException {
maybeThrowLastError();
writeCurrentBufferToService();
- flushWrittenBytesToService();
+ flushWrittenBytesToService(isClose);
}
private synchronized void flushInternalAsync() throws IOException {
@@ -288,7 +288,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
shrinkWriteOperationQueue();
}
- private synchronized void flushWrittenBytesToService() throws IOException {
+ private synchronized void flushWrittenBytesToService(boolean isClose) throws IOException {
for (WriteOperation writeOperation : writeOperations) {
try {
writeOperation.task.get();
@@ -306,21 +306,22 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
throw lastError;
}
}
- flushWrittenBytesToServiceInternal(position, false);
+ flushWrittenBytesToServiceInternal(position, false, isClose);
}
private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
shrinkWriteOperationQueue();
if (this.lastTotalAppendOffset > this.lastFlushOffset) {
- this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true);
+ this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true,
+ false/*Async flush on close not permitted*/);
}
}
private synchronized void flushWrittenBytesToServiceInternal(final long offset,
- final boolean retainUncommitedData) throws IOException {
+ final boolean retainUncommitedData, final boolean isClose) throws IOException {
try {
- client.flush(path, offset, retainUncommitedData);
+ client.flush(path, offset, retainUncommitedData, isClose);
} catch (AzureBlobFileSystemException ex) {
if (ex instanceof AbfsRestOperationException) {
if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org