You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/12/18 11:55:11 UTC

[flink] branch master updated: [FLINK-11151][rest] Create parent directories in FileUploadHandler

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 84b6616  [FLINK-11151][rest] Create parent directories in FileUploadHandler
84b6616 is described below

commit 84b66165b2be083da1ddece3053244bbda8c3eee
Author: zentol <ch...@apache.org>
AuthorDate: Thu Dec 13 15:37:13 2018 +0100

    [FLINK-11151][rest] Create parent directories in FileUploadHandler
---
 .../org/apache/flink/runtime/rest/FileUploadHandler.java  |  4 ++++
 .../apache/flink/runtime/rest/FileUploadHandlerTest.java  | 15 +++++++++++++++
 .../flink/runtime/rest/MultipartUploadResource.java       |  4 ++++
 3 files changed, 23 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index 7c46af0..b99de66 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -102,6 +102,10 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> {
 						checkState(currentUploadDir == null);
 						currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
 						currentHttpRequest = ReferenceCountUtil.retain(httpRequest);
+
+						// make sure that we still have a upload dir in case that it got deleted in the meanwhile
+						RestServerEndpoint.createUploadDir(uploadDir, LOG);
+
 						currentUploadDir = Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString()));
 					} else {
 						ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
index 858c662..771fd8a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest;
 
 import org.apache.flink.runtime.io.network.netty.NettyLeakDetectionResource;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -124,6 +125,20 @@ public class FileUploadHandlerTest extends TestLogger {
 	}
 
 	@Test
+	public void testUploadDirectoryRegeneration() throws Exception {
+		OkHttpClient client = new OkHttpClient();
+
+		MultipartUploadResource.MultipartFileHandler fileHandler = MULTIPART_UPLOAD_RESOURCE.getFileHandler();
+
+		FileUtils.deleteDirectory(MULTIPART_UPLOAD_RESOURCE.getUploadDirectory().toFile());
+
+		Request fileRequest = buildFileRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL());
+		try (Response response = client.newCall(fileRequest).execute()) {
+			assertEquals(fileHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
+		}
+	}
+
+	@Test
 	public void testMixedMultipart() throws Exception {
 		OkHttpClient client = new OkHttpClient();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
index 22de8a1..65690c8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -163,6 +163,10 @@ public class MultipartUploadResource extends ExternalResource {
 		return jsonHandler;
 	}
 
+	public Path getUploadDirectory() {
+		return configuredUploadDir;
+	}
+
 	public void resetState() {
 		mixedHandler.lastReceivedRequest = null;
 		jsonHandler.lastReceivedRequest = null;