You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2018/06/18 13:21:50 UTC

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/6178

    [FLINK-9599][rest] Implement generic mechanism to access uploaded files

    ## What is the purpose of the change
    
    This PR extends the existing multipart handling to also support mixed multipart message (i.e. requests  containing both JSON and files), and generalizes the `FileUpload` handling to provide access to all handlers extending `AbstractHandler`
    
    Handlers may access uploaded files via `HandlerRequest#getFileUploads`. File uploads must be explicitly allowed by returning true in `MessageHeaders#acceptsFileUploads`.
    The files and JSON payload are forwarded to the handler by the `FileUploadHandler` via channel attributes. If a JSON payload is forwarded this way a handler will ignore the content of the received `HttpRequest`.
    
    This PR only covers the server-side; the `RestClient` remains unchanged. This will be done in a follow-up.
    
    ## Brief change log
    * add `MessageHeaders#acceptsFileUploads` to signal that a handler accepts file uploads (default=false)
    * add `FileUploads` class as a container for uploaded files
    * extend `FileUploadHandler` to
      * accept multiple files in one request
      * also accept a JSON payload
      * forward both files and json via channel attributes
    * extend `AbstractHandler` to retrieve files/json from channel attributes
      * remove special case for `JarRunHandler` in `AbstractHandler`
    * extend `HandlerRequest` to accept a `FileUploads` object
    * add `OkHttp` dependency to `flink-runtime` for testing purposes
    * update `JarUploadHandler/Headers`
    
    ## Verifying this change
    
    added tests:
    * FileUploadsTest
    * FileUploadHandlerTest
    * manually verified via WebUI job submission
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 9280_beta

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6178.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6178
    
----
commit c2adb3880180d8426697c33035d2b31d57d93952
Author: zentol <ch...@...>
Date:   2018-06-18T08:54:42Z

    [FLINK-9599][rest] Implement generic mechanism to access uploaded files

----


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196739865
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java ---
    @@ -0,0 +1,108 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler;
    +
    +import org.apache.flink.util.FileUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.nio.file.FileVisitResult;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.SimpleFileVisitor;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +
    +/**
    + * A container for uploaded files.
    + *
    + * <p>Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up.
    + * For convenience during testing it also accepts files directly.
    + */
    +public final class FileUploads implements AutoCloseable {
    +	private final Collection<Path> directoriesToClean;
    +	private final Collection<Path> uploadedFiles;
    +
    +	@SuppressWarnings("resource")
    +	public static final FileUploads EMPTY = new FileUploads();
    +
    +	private FileUploads() {
    +		this.directoriesToClean = Collections.emptyList();
    +		this.uploadedFiles = Collections.emptyList();
    +	}
    +
    +	public FileUploads(Collection<Path> uploadedFilesOrDirectory) throws IOException {
    +		final Collection<Path> files = new ArrayList<>(4);
    +		final Collection<Path> directories = new ArrayList<>(1);
    +		for (Path fileOrDirectory : uploadedFilesOrDirectory) {
    +			Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute.");
    +			if (Files.isDirectory(fileOrDirectory)) {
    +				directories.add(fileOrDirectory);
    +				FileAdderVisitor visitor = new FileAdderVisitor();
    +				Files.walkFileTree(fileOrDirectory, visitor);
    +				files.addAll(visitor.getContainedFiles());
    +			} else {
    +				files.add(fileOrDirectory);
    +			}
    +		}
    +		directoriesToClean = Collections.unmodifiableCollection(directories);
    +		uploadedFiles = Collections.unmodifiableCollection(files);
    --- End diff --
    
    I think it is better to move this logic out of the constructor. Adding logic to a constructor makes testing always difficult.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196667307
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java ---
    @@ -116,5 +136,16 @@ private void reset() {
     		currentHttpPostRequestDecoder.destroy();
     		currentHttpPostRequestDecoder = null;
     		currentHttpRequest = null;
    +		currentUploadDir = null;
    +		currentJsonPayload = null;
    +	}
    +
    +	public static Optional<byte[]> getMultipartJsonPayload(ChannelHandlerContext ctx) {
    +		return Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get());
    +	}
    +
    +	public static FileUploads getMultipartFileUploads(ChannelHandlerContext ctx) {
    +		return Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get())
    +			.orElse(FileUploads.EMPTY);
    --- End diff --
    
    How files are stored is an implementation detail of the `FileUploadHandler`, why would we expose this to subsequent handlers?


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196090706
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java ---
    @@ -52,7 +57,10 @@
     
    --- End diff --
    
    should maybe rename the class to `MultipartRequestHandler`


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196696248
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java ---
    @@ -116,5 +136,16 @@ private void reset() {
     		currentHttpPostRequestDecoder.destroy();
     		currentHttpPostRequestDecoder = null;
     		currentHttpRequest = null;
    +		currentUploadDir = null;
    +		currentJsonPayload = null;
    +	}
    +
    +	public static Optional<byte[]> getMultipartJsonPayload(ChannelHandlerContext ctx) {
    +		return Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get());
    +	}
    +
    +	public static FileUploads getMultipartFileUploads(ChannelHandlerContext ctx) {
    +		return Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get())
    +			.orElse(FileUploads.EMPTY);
    --- End diff --
    
    How does this differ to the current implementation? Are you suggesting to simplify `FileUploads` to only consider the case of 1 directory containing N files?


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196739603
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java ---
    @@ -70,9 +70,14 @@ protected AbstractRestHandler(
     	}
     
     	@Override
    -	protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<R, M> handlerRequest, T gateway) throws RestHandlerException {
    +	protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<R, M> handlerRequest, T gateway) {
     		CompletableFuture<P> response;
     
    +		if (!messageHeaders.acceptsFileUploads() && !handlerRequest.getUploadedFiles().isEmpty()) {
    +			processRestHandlerException(ctx, httpRequest, new RestHandlerException("File uploads not allowed.", HttpResponseStatus.BAD_REQUEST));
    +			return;
    +		}
    --- End diff --
    
    Shouldn't this be moved to the `AbstractHandler`?


---

[GitHub] flink issue #6178: [FLINK-9599][rest] Implement generic mechanism to access ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/6178
  
    will change the logging and merge afterwards.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196555570
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler;
    +
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.nio.file.FileVisitResult;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.SimpleFileVisitor;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +
    +/**
    + * A container for uploaded files.
    + *
    + * <p>Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up.
    + * For convenience during testing it also accepts files directly.
    + */
    +public final class FileUploads implements AutoCloseable {
    +	private final Collection<Path> directoriesToClean;
    +	private final Collection<Path> uploadedFiles;
    +
    +	@SuppressWarnings("resource")
    +	public static final FileUploads EMPTY = new FileUploads();
    +
    +	private FileUploads() {
    +		this.directoriesToClean = Collections.emptyList();
    +		this.uploadedFiles = Collections.emptyList();
    +	}
    +
    +	public FileUploads(Collection<Path> uploadedFilesOrDirectory) throws IOException {
    +		final Collection<Path> files = new ArrayList<>(4);
    +		final Collection<Path> directories = new ArrayList<>(1);
    +		for (Path fileOrDirectory : uploadedFilesOrDirectory) {
    +			Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute.");
    +			if (Files.isDirectory(fileOrDirectory)) {
    +				directories.add(fileOrDirectory);
    +				FileAdderVisitor visitor = new FileAdderVisitor();
    +				Files.walkFileTree(fileOrDirectory, visitor);
    +				files.addAll(visitor.get());
    +			} else {
    +				files.add(fileOrDirectory);
    +			}
    +		}
    +		directoriesToClean = Collections.unmodifiableCollection(directories);
    +		uploadedFiles = Collections.unmodifiableCollection(files);
    +	}
    +
    +	public Collection<Path> getUploadedFiles() {
    +		return uploadedFiles;
    +	}
    +
    +	@Override
    +	public void close() throws IOException {
    +		for (Path file : uploadedFiles) {
    +			try {
    +				Files.delete(file);
    +			} catch (FileNotFoundException ignored) {
    +				// file may have been moved by a handler
    +			}
    +		}
    +		for (Path directory : directoriesToClean) {
    +			Files.walkFileTree(directory, CleanupFileVisitor.get());
    +		}
    +	}
    +
    +	private static final class FileAdderVisitor extends SimpleFileVisitor<Path> {
    +
    +		private final Collection<Path> files = new ArrayList<>(4);
    +
    +		Collection<Path> get() {
    --- End diff --
    
    maybe more descriptive name than `get`.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196453980
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java ---
    @@ -129,4 +137,9 @@ public R getRequestBody() {
     			return queryParameter.getValue();
     		}
     	}
    +
    +	@Nonnull
    +	public FileUploads getFileUploads() {
    +		return uploadedFiles;
    +	}
    --- End diff --
    
    I would not expose `FileUploads` to the user but rather return a `Collection<File>`.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r197050662
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest;
    +
    +import org.apache.flink.runtime.rest.handler.FileUploads;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.handler.router.RouteResult;
    +import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
    +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
    +import org.apache.flink.runtime.rpc.RpcUtils;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +
    +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
    +import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
    +import org.apache.flink.shaded.netty4.io.netty.util.Attribute;
    +import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
    +
    +import org.junit.Assert;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.util.Collections;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    +
    +/**
    + * Tests for {@link AbstractHandler}.
    + */
    +public class AbstractHandlerTest {
    +
    +	@Rule
    +	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	@Test
    +	public void testFileCleanup() throws Exception {
    +		final Path file = temporaryFolder.newFile().toPath();
    +
    +		final String restAddress = "http://localhost:1234";
    +		RestfulGateway mockRestfulGateway = TestingRestfulGateway.newBuilder()
    +			.setRestAddress(restAddress)
    +			.build();
    +
    +		final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () ->
    +			CompletableFuture.completedFuture(mockRestfulGateway);
    +
    +		TestHandler handler = new TestHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
    +
    +		RouteResult<?> routeResult = new RouteResult<>("", "", Collections.emptyMap(), Collections.emptyMap(), "");
    +		HttpRequest request = new DefaultFullHttpRequest(
    +			HttpVersion.HTTP_1_1,
    +			HttpMethod.GET,
    +			TestHandler.TestHeaders.INSTANCE.getTargetRestEndpointURL(),
    +			Unpooled.wrappedBuffer(new byte[0]));
    +		RoutedRequest<?> routerRequest = new RoutedRequest<>(routeResult, request);
    +
    +		Attribute<FileUploads> attribute = new SimpleAttribute();
    +		attribute.set(new FileUploads(Collections.emptyList(), Collections.singleton(file)));
    +		Channel channel = mock(Channel.class);
    +		when(channel.attr(any(AttributeKey.class))).thenReturn(attribute);
    +
    +		ChannelHandlerContext context = mock(ChannelHandlerContext.class);
    +		when(context.channel()).thenReturn(channel);
    --- End diff --
    
    Could we use the `TestingChannelHandlerContext` instead of mocking these interfaces with Mockito?


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r197049400
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java ---
    @@ -45,27 +45,26 @@
     	@SuppressWarnings("resource")
     	public static final FileUploads EMPTY = new FileUploads();
     
    +	public static FileUploads forDirectory(Path directory) throws IOException {
    +		final Collection<Path> files = new ArrayList<>(4);
    +		Preconditions.checkArgument(directory.isAbsolute(), "Path must be absolute.");
    +		Preconditions.checkArgument(Files.isDirectory(directory), "Path must be a directory.");
    +
    +		FileAdderVisitor visitor = new FileAdderVisitor();
    +		Files.walkFileTree(directory, visitor);
    +		files.addAll(visitor.getContainedFiles());
    +		
    +		return new FileUploads(Collections.singleton(directory), files);
    +	}
    +
     	private FileUploads() {
     		this.directoriesToClean = Collections.emptyList();
     		this.uploadedFiles = Collections.emptyList();
     	}
     
    -	public FileUploads(Collection<Path> uploadedFilesOrDirectory) throws IOException {
    -		final Collection<Path> files = new ArrayList<>(4);
    -		final Collection<Path> directories = new ArrayList<>(1);
    -		for (Path fileOrDirectory : uploadedFilesOrDirectory) {
    -			Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute.");
    -			if (Files.isDirectory(fileOrDirectory)) {
    -				directories.add(fileOrDirectory);
    -				FileAdderVisitor visitor = new FileAdderVisitor();
    -				Files.walkFileTree(fileOrDirectory, visitor);
    -				files.addAll(visitor.getContainedFiles());
    -			} else {
    -				files.add(fileOrDirectory);
    -			}
    -		}
    -		directoriesToClean = Collections.unmodifiableCollection(directories);
    -		uploadedFiles = Collections.unmodifiableCollection(files);
    +	public FileUploads(Collection<Path> directoriesToClean, Collection<Path> uploadedFiles) {
    +		this.directoriesToClean = Preconditions.checkNotNull(directoriesToClean);
    +		this.uploadedFiles = Preconditions.checkNotNull(uploadedFiles);
    --- End diff --
    
    I know it's really nitpicking what I'm doing here, but I think it would be slightly better to let `FileUploads` only represent the upload directories and add a method `FileUploads#getFiles` which returns a `Collection<File>` which are all files being found in the upload directory. The difference is that we don't initialize `FileUploads` with it. That would effectively enforce that all files reside in the given upload directories. What we could do now is to initialize this class with directories `/web/upload/a, /web/upload/b` and files `/web/different/path/file` where the files are somewhere else located. Due to this, we not only need to delete the directories but also all files.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196556101
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java ---
    @@ -0,0 +1,124 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler;
    +
    +import org.junit.Assert;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.IOException;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.util.Collection;
    +import java.util.Collections;
    +
    +/**
    + * Tests for {@link FileUploads}.
    + */
    +public class FileUploadsTest {
    --- End diff --
    
    `extends TestLogger` is missing


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196667391
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java ---
    @@ -95,14 +107,22 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms
     					final DiskFileUpload fileUpload = (DiskFileUpload) data;
     					checkState(fileUpload.isCompleted());
     
    -					final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() +
    -						"_" + fileUpload.getFilename()));
    +					final Path dest = currentUploadDir.resolve(fileUpload.getFilename());
     					fileUpload.renameTo(dest.toFile());
    -					ctx.channel().attr(UPLOADED_FILE).set(dest);
    +				} else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
    +					final Attribute request = (Attribute) data;
    +					// this could also be implemented by using the first found Attribute as the payload
    +					if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
    +						currentJsonPayload = request.get();
    +					} else {
    +						LOG.warn("Received unknown attribute {}, will be ignored.", data.getName());
    --- End diff --
    
    yes.


---

[GitHub] flink issue #6178: [FLINK-9599][rest] Implement generic mechanism to access ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/6178
  
    @tillrohrmann Ready for another review.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r197058127
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java ---
    @@ -259,6 +271,29 @@ public void testFileMultipart() throws Exception {
     		}
     	}
     
    +	@Test
    +	public void testUploadCleanupOnFailure() throws IOException {
    +		OkHttpClient client = new OkHttpClient();
    +
    +		Request request = buildMixedRequestWithUnknownAttribute(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
    +		try (Response response = client.newCall(request).execute()) {
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +		assertUploadDirectoryIsEmpty();
    +	}
    +
    +	private static void assertUploadDirectoryIsEmpty() throws IOException {
    +		Preconditions.checkArgument(
    +			1 == Files.list(configuredUploadDir).count(),
    +			"Directory structure in rest upload directory has changed. Test must be adjusted");
    +		Optional<Path> actualUploadDir = Files.list(configuredUploadDir).findAny();
    +		Preconditions.checkArgument(
    +			actualUploadDir.isPresent(),
    +			"Expected upload directory does not exist.");
    +		System.out.println(Files.list(actualUploadDir.get()).collect(Collectors.toList()));
    +		assertEquals("Not all files were cleaned up.", 0, Files.list(actualUploadDir.get()).count());
    --- End diff --
    
    Alright, but then the order how methods are called in the catch block is incorrect. Maybe that should be factored out into a method to avoid code duplication.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196741255
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java ---
    @@ -0,0 +1,483 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.configuration.WebOptions;
    +import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.RequestBody;
    +import org.apache.flink.runtime.rest.util.RestMapperUtils;
    +import org.apache.flink.runtime.rpc.RpcUtils;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +
    +import okhttp3.MediaType;
    +import okhttp3.MultipartBody;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Request;
    +import okhttp3.Response;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.concurrent.CompletableFuture;
    +
    +import static java.util.Objects.requireNonNull;
    +import static org.junit.Assert.assertArrayEquals;
    +import static org.junit.Assert.assertEquals;
    +
    +/**
    + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly
    + * handled.
    + */
    +public class FileUploadHandlerTest extends TestLogger {
    +
    +	private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper();
    +	private static final Random RANDOM = new Random();
    +
    +	@ClassRule
    +	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    +
    +	private static RestServerEndpoint serverEndpoint;
    +	private static String serverAddress;
    +
    +	private static MultipartMixedHandler mixedHandler;
    +	private static MultipartJsonHandler jsonHandler;
    +	private static MultipartFileHandler fileHandler;
    +	private static File file1;
    +	private static File file2;
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		Configuration config = new Configuration();
    +		config.setInteger(RestOptions.PORT, 0);
    +		config.setString(RestOptions.ADDRESS, "localhost");
    +		config.setString(WebOptions.UPLOAD_DIR, TEMPORARY_FOLDER.newFolder().getCanonicalPath());
    +
    +		RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
    +
    +		final String restAddress = "http://localhost:1234";
    +		RestfulGateway mockRestfulGateway = TestingRestfulGateway.newBuilder()
    +			.setRestAddress(restAddress)
    +			.build();
    +
    +		final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () ->
    +			CompletableFuture.completedFuture(mockRestfulGateway);
    +
    +		file1 = TEMPORARY_FOLDER.newFile();
    +		Files.write(file1.toPath(), "hello".getBytes(ConfigConstants.DEFAULT_CHARSET));
    +		file2 = TEMPORARY_FOLDER.newFile();
    +		Files.write(file2.toPath(), "world".getBytes(ConfigConstants.DEFAULT_CHARSET));
    +
    +		mixedHandler = new MultipartMixedHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
    +		jsonHandler = new MultipartJsonHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
    +		fileHandler = new MultipartFileHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
    +
    +		final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = Arrays.asList(
    +			Tuple2.of(mixedHandler.getMessageHeaders(), mixedHandler),
    +			Tuple2.of(jsonHandler.getMessageHeaders(), jsonHandler),
    +			Tuple2.of(fileHandler.getMessageHeaders(), fileHandler));
    +
    +		serverEndpoint = new TestRestServerEndpoint(serverConfig, handlers);
    +
    +		serverEndpoint.start();
    +		serverAddress = serverEndpoint.getRestBaseUrl();
    +	}
    +
    +	@AfterClass
    +	public static void teardown() throws Exception {
    +		if (serverEndpoint != null) {
    +			serverEndpoint.close();
    +			serverEndpoint = null;
    +		}
    +	}
    +
    +	private static Request buildFileRequest(String headerUrl) {
    +		MultipartBody.Builder builder = new MultipartBody.Builder();
    +		builder = addFilePart(builder);
    +		return finalizeRequest(builder, headerUrl);
    +	}
    +
    +	private static Request buildJsonRequest(String headerUrl, int index) throws IOException {
    +		MultipartBody.Builder builder = new MultipartBody.Builder();
    +		builder = addJsonPart(builder, index);
    +		return finalizeRequest(builder, headerUrl);
    +	}
    +
    +	private static Request buildMixedRequest(String headerUrl, int index) throws IOException {
    +		MultipartBody.Builder builder = new MultipartBody.Builder();
    +		builder = addJsonPart(builder, index);
    +		builder = addFilePart(builder);
    +		return finalizeRequest(builder, headerUrl);
    +	}
    +
    +	private static Request finalizeRequest(MultipartBody.Builder builder, String headerUrl) {
    +		MultipartBody multipartBody = builder
    +			.setType(MultipartBody.FORM)
    +			.build();
    +
    +		return new Request.Builder()
    +			.url(serverAddress + headerUrl)
    +			.post(multipartBody)
    +			.build();
    +	}
    +
    +	private static MultipartBody.Builder addFilePart(MultipartBody.Builder builder) {
    +		okhttp3.RequestBody filePayload1 = okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file1);
    +		okhttp3.RequestBody filePayload2 = okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file2);
    +
    +		return builder.addFormDataPart("file1", file1.getName(), filePayload1)
    +			.addFormDataPart("file2", file2.getName(), filePayload2);
    +	}
    +
    +	private static MultipartBody.Builder addJsonPart(MultipartBody.Builder builder, int index) throws IOException {
    +		TestRequestBody jsonRequestBody = new TestRequestBody(index);
    +
    +		StringWriter sw = new StringWriter();
    +		OBJECT_MAPPER.writeValue(sw, jsonRequestBody);
    +
    +		String jsonPayload = sw.toString();
    +
    +		return builder.addFormDataPart(org.apache.flink.runtime.rest.FileUploadHandler.HTTP_ATTRIBUTE_REQUEST, jsonPayload);
    +	}
    +
    +	@Test
    +	public void testMixedMultipart() throws Exception {
    +		OkHttpClient client = new OkHttpClient();
    +
    +		Request jsonRequest = buildJsonRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
    +		try (Response response = client.newCall(jsonRequest).execute()) {
    +			// explicitly rejected by the test handler implementation
    +			assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), response.code());
    +		}
    +
    +		Request fileRequest = buildFileRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
    +		try (Response response = client.newCall(fileRequest).execute()) {
    +			// expected JSON payload is missing
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +
    +		int mixedId = RANDOM.nextInt();
    +		Request mixedRequest = buildMixedRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), mixedId);
    +		try (Response response = client.newCall(mixedRequest).execute()) {
    +			assertEquals(mixedHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
    +			assertEquals(mixedId, mixedHandler.lastReceivedRequest.index);
    +		}
    +	}
    +
    +	@Test
    +	public void testJsonMultipart() throws Exception {
    +		OkHttpClient client = new OkHttpClient();
    +
    +		int jsonId = RANDOM.nextInt();
    +		Request jsonRequest = buildJsonRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL(), jsonId);
    +		try (Response response = client.newCall(jsonRequest).execute()) {
    +			assertEquals(jsonHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
    +			assertEquals(jsonId, jsonHandler.lastReceivedRequest.index);
    +		}
    +
    +		Request fileRequest = buildFileRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL());
    +		try (Response response = client.newCall(fileRequest).execute()) {
    +			// either because JSON payload is missing or FileUploads are outright forbidden
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +
    +		Request mixedRequest = buildMixedRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
    +		try (Response response = client.newCall(mixedRequest).execute()) {
    +			// FileUploads are outright forbidden
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +	}
    +
    +	@Test
    +	public void testFileMultipart() throws Exception {
    +		OkHttpClient client = new OkHttpClient();
    +
    +		Request jsonRequest = buildJsonRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
    +		try (Response response = client.newCall(jsonRequest).execute()) {
    +			// JSON payload did not match expected format
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +
    +		Request fileRequest = buildFileRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL());
    +		try (Response response = client.newCall(fileRequest).execute()) {
    +			assertEquals(fileHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
    +		}
    +
    +		Request mixedRequest = buildMixedRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
    +		try (Response response = client.newCall(mixedRequest).execute()) {
    +			// JSON payload did not match expected format
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +	}
    --- End diff --
    
    We should also add a test which verifies that the upload directory gets cleaned up after the request has been processed.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r197055449
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java ---
    @@ -0,0 +1,107 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler;
    +
    +import org.apache.flink.util.FileUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.nio.file.FileVisitResult;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.SimpleFileVisitor;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +
    +/**
    + * A container for uploaded files.
    + *
    + * <p>Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up.
    + * For convenience during testing it also accepts files directly.
    + */
    +public final class FileUploads implements AutoCloseable {
    +	private final Collection<Path> directoriesToClean;
    +	private final Collection<Path> uploadedFiles;
    +
    +	@SuppressWarnings("resource")
    +	public static final FileUploads EMPTY = new FileUploads();
    +
    +	public static FileUploads forDirectory(Path directory) throws IOException {
    +		final Collection<Path> files = new ArrayList<>(4);
    +		Preconditions.checkArgument(directory.isAbsolute(), "Path must be absolute.");
    +		Preconditions.checkArgument(Files.isDirectory(directory), "Path must be a directory.");
    +
    +		FileAdderVisitor visitor = new FileAdderVisitor();
    +		Files.walkFileTree(directory, visitor);
    +		files.addAll(visitor.getContainedFiles());
    +		
    +		return new FileUploads(Collections.singleton(directory), files);
    +	}
    +
    +	private FileUploads() {
    +		this.directoriesToClean = Collections.emptyList();
    +		this.uploadedFiles = Collections.emptyList();
    +	}
    +
    +	public FileUploads(Collection<Path> directoriesToClean, Collection<Path> uploadedFiles) {
    +		this.directoriesToClean = Preconditions.checkNotNull(directoriesToClean);
    +		this.uploadedFiles = Preconditions.checkNotNull(uploadedFiles);
    --- End diff --
    
    I know it's really nitpicking what I'm doing here, but I think it would be slightly better to let FileUploads only represent the upload directories and add a method FileUploads#getFiles which returns a Collection<File> which are all files being found in the upload directory. The difference is that we don't initialize FileUploads with it. That would effectively enforce that all files reside in the given upload directories. What we could do now is to initialize this class with directories /web/upload/a, /web/upload/b and files /web/different/path/file where the files are somewhere else located. Due to this, we not only need to delete the directories but also all files.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196836698
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java ---
    @@ -0,0 +1,108 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler;
    +
    +import org.apache.flink.util.FileUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.nio.file.FileVisitResult;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.SimpleFileVisitor;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +
    +/**
    + * A container for uploaded files.
    + *
    + * <p>Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up.
    + * For convenience during testing it also accepts files directly.
    + */
    +public final class FileUploads implements AutoCloseable {
    +	private final Collection<Path> directoriesToClean;
    +	private final Collection<Path> uploadedFiles;
    +
    +	@SuppressWarnings("resource")
    +	public static final FileUploads EMPTY = new FileUploads();
    +
    +	private FileUploads() {
    +		this.directoriesToClean = Collections.emptyList();
    +		this.uploadedFiles = Collections.emptyList();
    +	}
    +
    +	public FileUploads(Collection<Path> uploadedFilesOrDirectory) throws IOException {
    +		final Collection<Path> files = new ArrayList<>(4);
    +		final Collection<Path> directories = new ArrayList<>(1);
    +		for (Path fileOrDirectory : uploadedFilesOrDirectory) {
    +			Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute.");
    +			if (Files.isDirectory(fileOrDirectory)) {
    +				directories.add(fileOrDirectory);
    +				FileAdderVisitor visitor = new FileAdderVisitor();
    +				Files.walkFileTree(fileOrDirectory, visitor);
    +				files.addAll(visitor.getContainedFiles());
    +			} else {
    +				files.add(fileOrDirectory);
    +			}
    --- End diff --
    
    we don't have to, it's for testing convenience as noted in the class javadocs.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196452755
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java ---
    @@ -95,14 +107,22 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms
     					final DiskFileUpload fileUpload = (DiskFileUpload) data;
     					checkState(fileUpload.isCompleted());
     
    -					final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() +
    -						"_" + fileUpload.getFilename()));
    +					final Path dest = currentUploadDir.resolve(fileUpload.getFilename());
     					fileUpload.renameTo(dest.toFile());
    -					ctx.channel().attr(UPLOADED_FILE).set(dest);
    +				} else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
    +					final Attribute request = (Attribute) data;
    +					// this could also be implemented by using the first found Attribute as the payload
    +					if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
    +						currentJsonPayload = request.get();
    +					} else {
    +						LOG.warn("Received unknown attribute {}, will be ignored.", data.getName());
    --- End diff --
    
    Should we rather fail?


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196740578
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java ---
    @@ -39,15 +41,21 @@
     public class HandlerRequest<R extends RequestBody, M extends MessageParameters> {
     
     	private final R requestBody;
    +	private final FileUploads uploadedFiles;
    --- End diff --
    
    This comment has not been addressed. I think the `HandlerRequest` should not know about the `FileUploads` because it can use to delete the files.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r197050066
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest;
    +
    +import org.apache.flink.runtime.rest.handler.FileUploads;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.handler.router.RouteResult;
    +import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
    +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
    +import org.apache.flink.runtime.rpc.RpcUtils;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +
    +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
    +import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
    +import org.apache.flink.shaded.netty4.io.netty.util.Attribute;
    +import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
    +
    +import org.junit.Assert;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.util.Collections;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    +
    +/**
    + * Tests for {@link AbstractHandler}.
    + */
    +public class AbstractHandlerTest {
    --- End diff --
    
    `TestLogger` missing


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r197053303
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java ---
    @@ -259,6 +271,29 @@ public void testFileMultipart() throws Exception {
     		}
     	}
     
    +	@Test
    +	public void testUploadCleanupOnFailure() throws IOException {
    +		OkHttpClient client = new OkHttpClient();
    +
    +		Request request = buildMixedRequestWithUnknownAttribute(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
    +		try (Response response = client.newCall(request).execute()) {
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +		assertUploadDirectoryIsEmpty();
    +	}
    +
    +	private static void assertUploadDirectoryIsEmpty() throws IOException {
    +		Preconditions.checkArgument(
    +			1 == Files.list(configuredUploadDir).count(),
    +			"Directory structure in rest upload directory has changed. Test must be adjusted");
    +		Optional<Path> actualUploadDir = Files.list(configuredUploadDir).findAny();
    +		Preconditions.checkArgument(
    +			actualUploadDir.isPresent(),
    +			"Expected upload directory does not exist.");
    +		System.out.println(Files.list(actualUploadDir.get()).collect(Collectors.toList()));
    +		assertEquals("Not all files were cleaned up.", 0, Files.list(actualUploadDir.get()).count());
    --- End diff --
    
    Why don't we run into a race condition with the catch block of `FileUploadHandler`?


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196452024
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java ---
    @@ -95,14 +107,22 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms
     					final DiskFileUpload fileUpload = (DiskFileUpload) data;
     					checkState(fileUpload.isCompleted());
     
    -					final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() +
    -						"_" + fileUpload.getFilename()));
    +					final Path dest = currentUploadDir.resolve(fileUpload.getFilename());
     					fileUpload.renameTo(dest.toFile());
    -					ctx.channel().attr(UPLOADED_FILE).set(dest);
    +				} else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
    +					final Attribute request = (Attribute) data;
    +					// this could also be implemented by using the first found Attribute as the payload
    +					if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
    +						currentJsonPayload = request.get();
    +					} else {
    +						LOG.warn("Received unknown attribute {}, will be ignored.", data.getName());
    +					}
     				}
     			}
     
     			if (httpContent instanceof LastHttpContent) {
    +				ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(Collections.singleton(currentUploadDir)));
    +				ctx.channel().attr(UPLOADED_JSON).set(currentJsonPayload);
    --- End diff --
    
    I think it would be better to not store the JSON payload as an `Attribute` but instead forward it via `httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload))`.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196554025
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java ---
    @@ -103,77 +104,74 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe
     				return;
     			}
     
    -			ByteBuf msgContent = ((FullHttpRequest) httpRequest).content();
    +			final ByteBuf msgContent;
    +			Optional<byte[]> multipartJsonPayload = FileUploadHandler.getMultipartJsonPayload(ctx);
    +			if (multipartJsonPayload.isPresent()) {
    +				msgContent = Unpooled.wrappedBuffer(multipartJsonPayload.get());
    --- End diff --
    
    Let's send the Json payload as a proper `HttpRequest`, then we don't have this special casing here.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r197085237
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java ---
    @@ -151,18 +143,26 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms
     				ctx.fireChannelRead(msg);
     			}
     		} catch (Exception e) {
    -			HttpRequest tmpRequest = currentHttpRequest;
    -			deleteUploadedFiles();
    -			reset();
    -			LOG.warn("Internal server error. File upload failed.", e);
    -			HandlerUtils.sendErrorResponse(
    -				ctx,
    -				tmpRequest,
    -				new ErrorResponseBody("File upload failed."),
    -				HttpResponseStatus.INTERNAL_SERVER_ERROR,
    -				Collections.emptyMap()
    -			);
    +			handleError(ctx, "File upload failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, e);
    +		}
    +	}
    +
    +	private void handleError(ChannelHandlerContext ctx, String errorMessage, HttpResponseStatus responseStatus, @Nullable Throwable e) {
    +		HttpRequest tmpRequest = currentHttpRequest;
    +		deleteUploadedFiles();
    +		reset();
    +		if (e == null) {
    +			LOG.warn(errorMessage);
    +		} else {
    +			LOG.warn(errorMessage, e);
    --- End diff --
    
    but this will also print "null", won't it? That's what i was trying to avoid here.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r197054545
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest;
    +
    +import org.apache.flink.runtime.rest.handler.FileUploads;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.handler.router.RouteResult;
    +import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
    +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
    +import org.apache.flink.runtime.rpc.RpcUtils;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +
    +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
    +import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
    +import org.apache.flink.shaded.netty4.io.netty.util.Attribute;
    +import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
    +
    +import org.junit.Assert;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.util.Collections;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    +
    +/**
    + * Tests for {@link AbstractHandler}.
    + */
    +public class AbstractHandlerTest {
    +
    +	@Rule
    +	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	@Test
    +	public void testFileCleanup() throws Exception {
    +		final Path file = temporaryFolder.newFile().toPath();
    +
    +		final String restAddress = "http://localhost:1234";
    +		RestfulGateway mockRestfulGateway = TestingRestfulGateway.newBuilder()
    +			.setRestAddress(restAddress)
    +			.build();
    +
    +		final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () ->
    +			CompletableFuture.completedFuture(mockRestfulGateway);
    +
    +		TestHandler handler = new TestHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
    +
    +		RouteResult<?> routeResult = new RouteResult<>("", "", Collections.emptyMap(), Collections.emptyMap(), "");
    +		HttpRequest request = new DefaultFullHttpRequest(
    +			HttpVersion.HTTP_1_1,
    +			HttpMethod.GET,
    +			TestHandler.TestHeaders.INSTANCE.getTargetRestEndpointURL(),
    +			Unpooled.wrappedBuffer(new byte[0]));
    +		RoutedRequest<?> routerRequest = new RoutedRequest<>(routeResult, request);
    +
    +		Attribute<FileUploads> attribute = new SimpleAttribute();
    +		attribute.set(new FileUploads(Collections.emptyList(), Collections.singleton(file)));
    +		Channel channel = mock(Channel.class);
    +		when(channel.attr(any(AttributeKey.class))).thenReturn(attribute);
    +
    +		ChannelHandlerContext context = mock(ChannelHandlerContext.class);
    +		when(context.channel()).thenReturn(channel);
    --- End diff --
    
    Yes, I guess that one would have to pull it out and create a builder similar to `TestingRestfulGateway`. One would not have to add a build method for all methods but one could incrementally extend this once it is needed. But this might be a bit too much work given that `ChannelHandlerContext` is hardly used by our tests.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196868266
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java ---
    @@ -0,0 +1,483 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.configuration.WebOptions;
    +import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.RequestBody;
    +import org.apache.flink.runtime.rest.util.RestMapperUtils;
    +import org.apache.flink.runtime.rpc.RpcUtils;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +
    +import okhttp3.MediaType;
    +import okhttp3.MultipartBody;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Request;
    +import okhttp3.Response;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.concurrent.CompletableFuture;
    +
    +import static java.util.Objects.requireNonNull;
    +import static org.junit.Assert.assertArrayEquals;
    +import static org.junit.Assert.assertEquals;
    +
    +/**
    + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly
    + * handled.
    + */
    +public class FileUploadHandlerTest extends TestLogger {
    +
    +	private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper();
    +	private static final Random RANDOM = new Random();
    +
    +	@ClassRule
    +	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    +
    +	private static RestServerEndpoint serverEndpoint;
    +	private static String serverAddress;
    +
    +	private static MultipartMixedHandler mixedHandler;
    +	private static MultipartJsonHandler jsonHandler;
    +	private static MultipartFileHandler fileHandler;
    +	private static File file1;
    +	private static File file2;
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		Configuration config = new Configuration();
    +		config.setInteger(RestOptions.PORT, 0);
    +		config.setString(RestOptions.ADDRESS, "localhost");
    +		config.setString(WebOptions.UPLOAD_DIR, TEMPORARY_FOLDER.newFolder().getCanonicalPath());
    +
    +		RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
    +
    +		final String restAddress = "http://localhost:1234";
    +		RestfulGateway mockRestfulGateway = TestingRestfulGateway.newBuilder()
    +			.setRestAddress(restAddress)
    +			.build();
    +
    +		final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () ->
    +			CompletableFuture.completedFuture(mockRestfulGateway);
    +
    +		file1 = TEMPORARY_FOLDER.newFile();
    +		Files.write(file1.toPath(), "hello".getBytes(ConfigConstants.DEFAULT_CHARSET));
    +		file2 = TEMPORARY_FOLDER.newFile();
    +		Files.write(file2.toPath(), "world".getBytes(ConfigConstants.DEFAULT_CHARSET));
    +
    +		mixedHandler = new MultipartMixedHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
    +		jsonHandler = new MultipartJsonHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
    +		fileHandler = new MultipartFileHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
    +
    +		final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = Arrays.asList(
    +			Tuple2.of(mixedHandler.getMessageHeaders(), mixedHandler),
    +			Tuple2.of(jsonHandler.getMessageHeaders(), jsonHandler),
    +			Tuple2.of(fileHandler.getMessageHeaders(), fileHandler));
    +
    +		serverEndpoint = new TestRestServerEndpoint(serverConfig, handlers);
    +
    +		serverEndpoint.start();
    +		serverAddress = serverEndpoint.getRestBaseUrl();
    +	}
    +
    +	@AfterClass
    +	public static void teardown() throws Exception {
    +		if (serverEndpoint != null) {
    +			serverEndpoint.close();
    +			serverEndpoint = null;
    +		}
    +	}
    +
    +	private static Request buildFileRequest(String headerUrl) {
    +		MultipartBody.Builder builder = new MultipartBody.Builder();
    +		builder = addFilePart(builder);
    +		return finalizeRequest(builder, headerUrl);
    +	}
    +
    +	private static Request buildJsonRequest(String headerUrl, int index) throws IOException {
    +		MultipartBody.Builder builder = new MultipartBody.Builder();
    +		builder = addJsonPart(builder, index);
    +		return finalizeRequest(builder, headerUrl);
    +	}
    +
    +	private static Request buildMixedRequest(String headerUrl, int index) throws IOException {
    +		MultipartBody.Builder builder = new MultipartBody.Builder();
    +		builder = addJsonPart(builder, index);
    +		builder = addFilePart(builder);
    +		return finalizeRequest(builder, headerUrl);
    +	}
    +
    +	private static Request finalizeRequest(MultipartBody.Builder builder, String headerUrl) {
    +		MultipartBody multipartBody = builder
    +			.setType(MultipartBody.FORM)
    +			.build();
    +
    +		return new Request.Builder()
    +			.url(serverAddress + headerUrl)
    +			.post(multipartBody)
    +			.build();
    +	}
    +
    +	private static MultipartBody.Builder addFilePart(MultipartBody.Builder builder) {
    +		okhttp3.RequestBody filePayload1 = okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file1);
    +		okhttp3.RequestBody filePayload2 = okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file2);
    +
    +		return builder.addFormDataPart("file1", file1.getName(), filePayload1)
    +			.addFormDataPart("file2", file2.getName(), filePayload2);
    +	}
    +
    +	private static MultipartBody.Builder addJsonPart(MultipartBody.Builder builder, int index) throws IOException {
    +		TestRequestBody jsonRequestBody = new TestRequestBody(index);
    +
    +		StringWriter sw = new StringWriter();
    +		OBJECT_MAPPER.writeValue(sw, jsonRequestBody);
    +
    +		String jsonPayload = sw.toString();
    +
    +		return builder.addFormDataPart(org.apache.flink.runtime.rest.FileUploadHandler.HTTP_ATTRIBUTE_REQUEST, jsonPayload);
    +	}
    +
    +	@Test
    +	public void testMixedMultipart() throws Exception {
    +		OkHttpClient client = new OkHttpClient();
    +
    +		Request jsonRequest = buildJsonRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
    +		try (Response response = client.newCall(jsonRequest).execute()) {
    +			// explicitly rejected by the test handler implementation
    +			assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), response.code());
    +		}
    +
    +		Request fileRequest = buildFileRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
    +		try (Response response = client.newCall(fileRequest).execute()) {
    +			// expected JSON payload is missing
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +
    +		int mixedId = RANDOM.nextInt();
    +		Request mixedRequest = buildMixedRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), mixedId);
    +		try (Response response = client.newCall(mixedRequest).execute()) {
    +			assertEquals(mixedHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
    +			assertEquals(mixedId, mixedHandler.lastReceivedRequest.index);
    +		}
    +	}
    +
    +	@Test
    +	public void testJsonMultipart() throws Exception {
    +		OkHttpClient client = new OkHttpClient();
    +
    +		int jsonId = RANDOM.nextInt();
    +		Request jsonRequest = buildJsonRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL(), jsonId);
    +		try (Response response = client.newCall(jsonRequest).execute()) {
    +			assertEquals(jsonHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
    +			assertEquals(jsonId, jsonHandler.lastReceivedRequest.index);
    +		}
    +
    +		Request fileRequest = buildFileRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL());
    +		try (Response response = client.newCall(fileRequest).execute()) {
    +			// either because JSON payload is missing or FileUploads are outright forbidden
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +
    +		Request mixedRequest = buildMixedRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
    +		try (Response response = client.newCall(mixedRequest).execute()) {
    +			// FileUploads are outright forbidden
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +	}
    +
    +	@Test
    +	public void testFileMultipart() throws Exception {
    +		OkHttpClient client = new OkHttpClient();
    +
    +		Request jsonRequest = buildJsonRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
    +		try (Response response = client.newCall(jsonRequest).execute()) {
    +			// JSON payload did not match expected format
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +
    +		Request fileRequest = buildFileRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL());
    +		try (Response response = client.newCall(fileRequest).execute()) {
    +			assertEquals(fileHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
    +		}
    +
    +		Request mixedRequest = buildMixedRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
    +		try (Response response = client.newCall(mixedRequest).execute()) {
    +			// JSON payload did not match expected format
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +	}
    --- End diff --
    
    the underlying issue is that we don't have unit test for `AbstractHandler` and `AbstractRestHandler`.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196452418
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java ---
    @@ -116,5 +136,16 @@ private void reset() {
     		currentHttpPostRequestDecoder.destroy();
     		currentHttpPostRequestDecoder = null;
     		currentHttpRequest = null;
    +		currentUploadDir = null;
    +		currentJsonPayload = null;
    +	}
    +
    +	public static Optional<byte[]> getMultipartJsonPayload(ChannelHandlerContext ctx) {
    +		return Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get());
    +	}
    --- End diff --
    
    By sending the json payload down stream, we could avoid having this method.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196455211
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java ---
    @@ -103,77 +104,74 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe
     				return;
     			}
     
    -			ByteBuf msgContent = ((FullHttpRequest) httpRequest).content();
    +			final ByteBuf msgContent;
    +			Optional<byte[]> multipartJsonPayload = FileUploadHandler.getMultipartJsonPayload(ctx);
    +			if (multipartJsonPayload.isPresent()) {
    +				msgContent = Unpooled.wrappedBuffer(multipartJsonPayload.get());
    +			} else {
    +				msgContent = ((FullHttpRequest) httpRequest).content();
    +			}
     
    -			R request;
    -			if (isFileUpload()) {
    -				final Path path = ctx.channel().attr(FileUploadHandler.UPLOADED_FILE).get();
    -				if (path == null) {
    -					HandlerUtils.sendErrorResponse(
    -						ctx,
    -						httpRequest,
    -						new ErrorResponseBody("Client did not upload a file."),
    -						HttpResponseStatus.BAD_REQUEST,
    -						responseHeaders);
    -					return;
    -				}
    -				//noinspection unchecked
    -				request = (R) new FileUpload(path);
    -			} else if (msgContent.capacity() == 0) {
    -				try {
    -					request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
    -				} catch (JsonParseException | JsonMappingException je) {
    -					log.error("Request did not conform to expected format.", je);
    -					HandlerUtils.sendErrorResponse(
    -						ctx,
    -						httpRequest,
    -						new ErrorResponseBody("Bad request received."),
    -						HttpResponseStatus.BAD_REQUEST,
    -						responseHeaders);
    -					return;
    +			try (FileUploads uploadedFiles = FileUploadHandler.getMultipartFileUploads(ctx)) {
    --- End diff --
    
    I would obtain the upload directory from `FileUploadHandler` and simply delete this directory after the call has been processed. We could, then also create `FileUploads` outside of the `FileUploadHandler` to instantiate a `HandlerRequest` with it. This would also simplify the `FileUploads` class significantly, because it is no longer responsible for deleting the files.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196555487
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler;
    +
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.nio.file.FileVisitResult;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.SimpleFileVisitor;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +
    +/**
    + * A container for uploaded files.
    + *
    + * <p>Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up.
    + * For convenience during testing it also accepts files directly.
    + */
    +public final class FileUploads implements AutoCloseable {
    +	private final Collection<Path> directoriesToClean;
    +	private final Collection<Path> uploadedFiles;
    +
    +	@SuppressWarnings("resource")
    +	public static final FileUploads EMPTY = new FileUploads();
    +
    +	private FileUploads() {
    +		this.directoriesToClean = Collections.emptyList();
    +		this.uploadedFiles = Collections.emptyList();
    +	}
    +
    +	public FileUploads(Collection<Path> uploadedFilesOrDirectory) throws IOException {
    +		final Collection<Path> files = new ArrayList<>(4);
    +		final Collection<Path> directories = new ArrayList<>(1);
    +		for (Path fileOrDirectory : uploadedFilesOrDirectory) {
    +			Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute.");
    +			if (Files.isDirectory(fileOrDirectory)) {
    +				directories.add(fileOrDirectory);
    +				FileAdderVisitor visitor = new FileAdderVisitor();
    +				Files.walkFileTree(fileOrDirectory, visitor);
    +				files.addAll(visitor.get());
    +			} else {
    +				files.add(fileOrDirectory);
    +			}
    +		}
    +		directoriesToClean = Collections.unmodifiableCollection(directories);
    +		uploadedFiles = Collections.unmodifiableCollection(files);
    +	}
    +
    +	public Collection<Path> getUploadedFiles() {
    +		return uploadedFiles;
    +	}
    +
    +	@Override
    +	public void close() throws IOException {
    +		for (Path file : uploadedFiles) {
    +			try {
    +				Files.delete(file);
    +			} catch (FileNotFoundException ignored) {
    +				// file may have been moved by a handler
    +			}
    +		}
    +		for (Path directory : directoriesToClean) {
    +			Files.walkFileTree(directory, CleanupFileVisitor.get());
    +		}
    +	}
    +
    +	private static final class FileAdderVisitor extends SimpleFileVisitor<Path> {
    +
    +		private final Collection<Path> files = new ArrayList<>(4);
    +
    +		Collection<Path> get() {
    +			return files;
    +		}
    +
    +		FileAdderVisitor() {
    +		}
    +
    +		@Override
    +		public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
    +			FileVisitResult result = super.visitFile(file, attrs);
    +			files.add(file);
    +			return result;
    +		}
    +	}
    +
    +	private static final class CleanupFileVisitor extends SimpleFileVisitor<Path> {
    --- End diff --
    
    I think it would be better to make this an enum. Then we get all singleton properties for free.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r197078502
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java ---
    @@ -151,18 +143,26 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms
     				ctx.fireChannelRead(msg);
     			}
     		} catch (Exception e) {
    -			HttpRequest tmpRequest = currentHttpRequest;
    -			deleteUploadedFiles();
    -			reset();
    -			LOG.warn("Internal server error. File upload failed.", e);
    -			HandlerUtils.sendErrorResponse(
    -				ctx,
    -				tmpRequest,
    -				new ErrorResponseBody("File upload failed."),
    -				HttpResponseStatus.INTERNAL_SERVER_ERROR,
    -				Collections.emptyMap()
    -			);
    +			handleError(ctx, "File upload failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, e);
    +		}
    +	}
    +
    +	private void handleError(ChannelHandlerContext ctx, String errorMessage, HttpResponseStatus responseStatus, @Nullable Throwable e) {
    +		HttpRequest tmpRequest = currentHttpRequest;
    +		deleteUploadedFiles();
    +		reset();
    +		if (e == null) {
    +			LOG.warn(errorMessage);
    +		} else {
    +			LOG.warn(errorMessage, e);
    --- End diff --
    
    I think we don't have to make this distinction here. `LOG.warn(errorMessage, null)` should also work.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196560163
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java ---
    @@ -0,0 +1,471 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.configuration.WebOptions;
    +import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
    +import org.apache.flink.runtime.rest.handler.FileUploads;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.RequestBody;
    +import org.apache.flink.runtime.rest.util.RestMapperUtils;
    +import org.apache.flink.runtime.rpc.RpcUtils;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +
    +import okhttp3.MediaType;
    +import okhttp3.MultipartBody;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Request;
    +import okhttp3.Response;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.concurrent.CompletableFuture;
    +
    +import static java.util.Objects.requireNonNull;
    +import static org.junit.Assert.assertArrayEquals;
    +import static org.junit.Assert.assertEquals;
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    +
    +/**
    + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly
    + * handled.
    + */
    +public class FileUploadHandlerTest {
    +
    +	private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper();
    +	private static final Random RANDOM = new Random();
    +
    +	@ClassRule
    +	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    +
    +	private static RestServerEndpoint serverEndpoint;
    +	private static String serverAddress;
    +
    +	private static MultipartMixedHandler mixedHandler;
    +	private static MultipartJsonHandler jsonHandler;
    +	private static MultipartFileHandler fileHandler;
    +	private static File file1;
    +	private static File file2;
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		Configuration config = new Configuration();
    +		config.setInteger(RestOptions.PORT, 0);
    +		config.setString(RestOptions.ADDRESS, "localhost");
    +		config.setString(WebOptions.UPLOAD_DIR, TEMPORARY_FOLDER.newFolder().getCanonicalPath());
    +
    +		RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
    +
    +		final String restAddress = "http://localhost:1234";
    +		RestfulGateway mockRestfulGateway = mock(RestfulGateway.class);
    +		when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
    +
    +		final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () ->
    +			CompletableFuture.completedFuture(mockRestfulGateway);
    +
    +		file1 = TEMPORARY_FOLDER.newFile();
    +		Files.write(file1.toPath(), "hello".getBytes(ConfigConstants.DEFAULT_CHARSET));
    +		file2 = TEMPORARY_FOLDER.newFile();
    +		Files.write(file2.toPath(), "world".getBytes(ConfigConstants.DEFAULT_CHARSET));
    +
    +		mixedHandler = new MultipartMixedHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
    +		jsonHandler = new MultipartJsonHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
    +		fileHandler = new MultipartFileHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
    +
    +		final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = Arrays.asList(
    +			Tuple2.of(mixedHandler.getMessageHeaders(), mixedHandler),
    +			Tuple2.of(jsonHandler.getMessageHeaders(), jsonHandler),
    +			Tuple2.of(fileHandler.getMessageHeaders(), fileHandler));
    +
    +		serverEndpoint = new TestRestServerEndpoint(serverConfig, handlers);
    +
    +		serverEndpoint.start();
    +		serverAddress = serverEndpoint.getRestBaseUrl();
    +	}
    +
    +	@AfterClass
    +	public static void teardown() throws Exception {
    +		if (serverEndpoint != null) {
    +			serverEndpoint.close();
    +			serverEndpoint = null;
    +		}
    +	}
    +
    +	private static Request buildRequest(String headerUrl, int index, boolean includeFile, boolean includeJson) throws IOException {
    +		Preconditions.checkArgument(includeFile || includeJson, "You have to either include JSON or a file.");
    +		MultipartBody.Builder builder = new MultipartBody.Builder();
    +
    +		if (includeFile) {
    +			okhttp3.RequestBody filePayload1 = okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file1);
    +			builder = builder.addFormDataPart("file1", file1.getName(), filePayload1);
    +
    +			okhttp3.RequestBody filePayload2 = okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file2);
    +			builder = builder.addFormDataPart("file2", file2.getName(), filePayload2);
    +		}
    +
    +		if (includeJson) {
    +			TestRequestBody jsonRequestBody = new TestRequestBody(index);
    +
    +			StringWriter sw = new StringWriter();
    +			OBJECT_MAPPER.writeValue(sw, jsonRequestBody);
    +
    +			String jsonPayload = sw.toString();
    +
    +			builder = builder.addFormDataPart(org.apache.flink.runtime.rest.FileUploadHandler.HTTP_ATTRIBUTE_REQUEST, jsonPayload);
    +		}
    +
    +		MultipartBody multipartBody = builder
    +			.setType(MultipartBody.FORM)
    +			.build();
    +
    +		return new Request.Builder()
    +			.url(serverAddress + headerUrl)
    +			.post(multipartBody)
    +			.build();
    +	}
    +
    +	@Test
    +	public void testMixedMultipart() throws Exception {
    +		OkHttpClient client = new OkHttpClient();
    +
    +		Request jsonRequest = buildRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt(), false, true);
    --- End diff --
    
    Would be good to have methods `buildFileRequest` and `buildJsonRequest` and `buildJsonFileRequest` instead of the version with boolean. Booleans make it harder to understand which request is generated.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196698434
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java ---
    @@ -72,10 +82,12 @@ public FileUploadHandler(final Path uploadDir) {
     	protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject msg) throws Exception {
     		if (msg instanceof HttpRequest) {
     			final HttpRequest httpRequest = (HttpRequest) msg;
    +			LOG.trace("Received request. URL:{} Method:{}", httpRequest.getUri(), httpRequest.getMethod());
     			if (httpRequest.getMethod().equals(HttpMethod.POST)) {
     				if (HttpPostRequestDecoder.isMultipart(httpRequest)) {
     					currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
     					currentHttpRequest = httpRequest;
    +					currentUploadDir = Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString()));
    --- End diff --
    
    If the `FileUploadHandler` itself fail it isn't cleaned up, but that was already the case in the existing code. The handler is generally rather _light_ when it comes to failure handling (i.e. it doesn't do anything in that regard).


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196558949
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java ---
    @@ -63,4 +63,13 @@
     	 * @return description for the header
     	 */
     	String getDescription();
    +
    +	/**
    +	 * Returns whether this header allows file uploads.
    +	 *
    +	 * @return whether this header allows file uploads
    +	 */
    +	default boolean acceptsFileUploads() {
    +		return false;
    +	}
    --- End diff --
    
    Should this maybe go into `UntypedResponseMessageHeaders`? At the moment one can upload files for a `AbstractHandler` (e.g. `AbstractTaskManagerFileHandler`) implementation and also has access to it via the `HandlerRequest` without being able to specify whether file upload is allowed or not. 


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196740018
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java ---
    @@ -70,51 +84,103 @@ public FileUploadHandler(final Path uploadDir) {
     
     	@Override
     	protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject msg) throws Exception {
    -		if (msg instanceof HttpRequest) {
    -			final HttpRequest httpRequest = (HttpRequest) msg;
    -			if (httpRequest.getMethod().equals(HttpMethod.POST)) {
    -				if (HttpPostRequestDecoder.isMultipart(httpRequest)) {
    -					currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
    -					currentHttpRequest = httpRequest;
    +		try {
    +			if (msg instanceof HttpRequest) {
    +				final HttpRequest httpRequest = (HttpRequest) msg;
    +				LOG.trace("Received request. URL:{} Method:{}", httpRequest.getUri(), httpRequest.getMethod());
    +				if (httpRequest.getMethod().equals(HttpMethod.POST)) {
    +					if (HttpPostRequestDecoder.isMultipart(httpRequest)) {
    +						currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
    +						currentHttpRequest = httpRequest;
    +						currentUploadDir = Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString()));
    +					} else {
    +						ctx.fireChannelRead(msg);
    +					}
     				} else {
     					ctx.fireChannelRead(msg);
     				}
    +			} else if (msg instanceof HttpContent && currentHttpPostRequestDecoder != null) {
    +				// make sure that we still have a upload dir in case that it got deleted in the meanwhile
    +				RestServerEndpoint.createUploadDir(uploadDir, LOG);
    +
    +				final HttpContent httpContent = (HttpContent) msg;
    +				currentHttpPostRequestDecoder.offer(httpContent);
    +
    +				while (currentHttpPostRequestDecoder.hasNext()) {
    +					final InterfaceHttpData data = currentHttpPostRequestDecoder.next();
    +					if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) {
    +						final DiskFileUpload fileUpload = (DiskFileUpload) data;
    +						checkState(fileUpload.isCompleted());
    +
    +						final Path dest = currentUploadDir.resolve(fileUpload.getFilename());
    +						fileUpload.renameTo(dest.toFile());
    +					} else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
    +						final Attribute request = (Attribute) data;
    +						// this could also be implemented by using the first found Attribute as the payload
    +						if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
    +							currentJsonPayload = request.get();
    +						} else {
    +							LOG.warn("Received unknown attribute {}.", data.getName());
    +							HandlerUtils.sendErrorResponse(
    +								ctx,
    +								currentHttpRequest,
    +								new ErrorResponseBody("Received unknown attribute " + data.getName() + '.'),
    +								HttpResponseStatus.BAD_REQUEST,
    +								Collections.emptyMap()
    +							);
    +							deleteUploadedFiles();
    +							reset();
    +							return;
    +						}
    +					}
    +				}
    +
    +				if (httpContent instanceof LastHttpContent) {
    +					ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(Collections.singleton(currentUploadDir)));
    +					ctx.fireChannelRead(currentHttpRequest);
    +					if (currentJsonPayload != null) {
    +						ctx.fireChannelRead(httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload)));
    +					} else {
    +						ctx.fireChannelRead(httpContent);
    +					}
    +					reset();
    +				}
     			} else {
     				ctx.fireChannelRead(msg);
     			}
    -		} else if (msg instanceof HttpContent && currentHttpPostRequestDecoder != null) {
    -			// make sure that we still have a upload dir in case that it got deleted in the meanwhile
    -			RestServerEndpoint.createUploadDir(uploadDir, LOG);
    -
    -			final HttpContent httpContent = (HttpContent) msg;
    -			currentHttpPostRequestDecoder.offer(httpContent);
    -
    -			while (currentHttpPostRequestDecoder.hasNext()) {
    -				final InterfaceHttpData data = currentHttpPostRequestDecoder.next();
    -				if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) {
    -					final DiskFileUpload fileUpload = (DiskFileUpload) data;
    -					checkState(fileUpload.isCompleted());
    -
    -					final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() +
    -						"_" + fileUpload.getFilename()));
    -					fileUpload.renameTo(dest.toFile());
    -					ctx.channel().attr(UPLOADED_FILE).set(dest);
    -				}
    -			}
    +		} catch (Exception e) {
    +			LOG.warn("Internal server error. File upload failed.", e);
    +			HandlerUtils.sendErrorResponse(
    +				ctx,
    +				currentHttpRequest,
    +				new ErrorResponseBody("File upload failed."),
    +				HttpResponseStatus.INTERNAL_SERVER_ERROR,
    +				Collections.emptyMap()
    +			);
    +			deleteUploadedFiles();
    +			reset();
    +		}
    +	}
     
    -			if (httpContent instanceof LastHttpContent) {
    -				ctx.fireChannelRead(currentHttpRequest);
    -				ctx.fireChannelRead(httpContent);
    -				reset();
    +	private void deleteUploadedFiles() {
    +		if (currentUploadDir != null) {
    +			try (FileUploads uploads = new FileUploads(Collections.singleton(currentUploadDir))) {
    --- End diff --
    
    Why do we create a `FileUploads` instance instead of simply calling `FileUtils.deleteDirectory(currentUploadDir)`?


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196849558
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java ---
    @@ -0,0 +1,483 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.configuration.WebOptions;
    +import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.RequestBody;
    +import org.apache.flink.runtime.rest.util.RestMapperUtils;
    +import org.apache.flink.runtime.rpc.RpcUtils;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +
    +import okhttp3.MediaType;
    +import okhttp3.MultipartBody;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Request;
    +import okhttp3.Response;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.concurrent.CompletableFuture;
    +
    +import static java.util.Objects.requireNonNull;
    +import static org.junit.Assert.assertArrayEquals;
    +import static org.junit.Assert.assertEquals;
    +
    +/**
    + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly
    + * handled.
    + */
    +public class FileUploadHandlerTest extends TestLogger {
    +
    +	private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper();
    +	private static final Random RANDOM = new Random();
    +
    +	@ClassRule
    +	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    +
    +	private static RestServerEndpoint serverEndpoint;
    +	private static String serverAddress;
    +
    +	private static MultipartMixedHandler mixedHandler;
    +	private static MultipartJsonHandler jsonHandler;
    +	private static MultipartFileHandler fileHandler;
    +	private static File file1;
    +	private static File file2;
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		Configuration config = new Configuration();
    +		config.setInteger(RestOptions.PORT, 0);
    +		config.setString(RestOptions.ADDRESS, "localhost");
    +		config.setString(WebOptions.UPLOAD_DIR, TEMPORARY_FOLDER.newFolder().getCanonicalPath());
    +
    +		RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
    +
    +		final String restAddress = "http://localhost:1234";
    +		RestfulGateway mockRestfulGateway = TestingRestfulGateway.newBuilder()
    +			.setRestAddress(restAddress)
    +			.build();
    +
    +		final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () ->
    +			CompletableFuture.completedFuture(mockRestfulGateway);
    +
    +		file1 = TEMPORARY_FOLDER.newFile();
    +		Files.write(file1.toPath(), "hello".getBytes(ConfigConstants.DEFAULT_CHARSET));
    +		file2 = TEMPORARY_FOLDER.newFile();
    +		Files.write(file2.toPath(), "world".getBytes(ConfigConstants.DEFAULT_CHARSET));
    +
    +		mixedHandler = new MultipartMixedHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
    +		jsonHandler = new MultipartJsonHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
    +		fileHandler = new MultipartFileHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
    +
    +		final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = Arrays.asList(
    +			Tuple2.of(mixedHandler.getMessageHeaders(), mixedHandler),
    +			Tuple2.of(jsonHandler.getMessageHeaders(), jsonHandler),
    +			Tuple2.of(fileHandler.getMessageHeaders(), fileHandler));
    +
    +		serverEndpoint = new TestRestServerEndpoint(serverConfig, handlers);
    +
    +		serverEndpoint.start();
    +		serverAddress = serverEndpoint.getRestBaseUrl();
    +	}
    +
    +	@AfterClass
    +	public static void teardown() throws Exception {
    +		if (serverEndpoint != null) {
    +			serverEndpoint.close();
    +			serverEndpoint = null;
    +		}
    +	}
    +
    +	private static Request buildFileRequest(String headerUrl) {
    +		MultipartBody.Builder builder = new MultipartBody.Builder();
    +		builder = addFilePart(builder);
    +		return finalizeRequest(builder, headerUrl);
    +	}
    +
    +	private static Request buildJsonRequest(String headerUrl, int index) throws IOException {
    +		MultipartBody.Builder builder = new MultipartBody.Builder();
    +		builder = addJsonPart(builder, index);
    +		return finalizeRequest(builder, headerUrl);
    +	}
    +
    +	private static Request buildMixedRequest(String headerUrl, int index) throws IOException {
    +		MultipartBody.Builder builder = new MultipartBody.Builder();
    +		builder = addJsonPart(builder, index);
    +		builder = addFilePart(builder);
    +		return finalizeRequest(builder, headerUrl);
    +	}
    +
    +	private static Request finalizeRequest(MultipartBody.Builder builder, String headerUrl) {
    +		MultipartBody multipartBody = builder
    +			.setType(MultipartBody.FORM)
    +			.build();
    +
    +		return new Request.Builder()
    +			.url(serverAddress + headerUrl)
    +			.post(multipartBody)
    +			.build();
    +	}
    +
    +	private static MultipartBody.Builder addFilePart(MultipartBody.Builder builder) {
    +		okhttp3.RequestBody filePayload1 = okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file1);
    +		okhttp3.RequestBody filePayload2 = okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file2);
    +
    +		return builder.addFormDataPart("file1", file1.getName(), filePayload1)
    +			.addFormDataPart("file2", file2.getName(), filePayload2);
    +	}
    +
    +	private static MultipartBody.Builder addJsonPart(MultipartBody.Builder builder, int index) throws IOException {
    +		TestRequestBody jsonRequestBody = new TestRequestBody(index);
    +
    +		StringWriter sw = new StringWriter();
    +		OBJECT_MAPPER.writeValue(sw, jsonRequestBody);
    +
    +		String jsonPayload = sw.toString();
    +
    +		return builder.addFormDataPart(org.apache.flink.runtime.rest.FileUploadHandler.HTTP_ATTRIBUTE_REQUEST, jsonPayload);
    +	}
    +
    +	@Test
    +	public void testMixedMultipart() throws Exception {
    +		OkHttpClient client = new OkHttpClient();
    +
    +		Request jsonRequest = buildJsonRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
    +		try (Response response = client.newCall(jsonRequest).execute()) {
    +			// explicitly rejected by the test handler implementation
    +			assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), response.code());
    +		}
    +
    +		Request fileRequest = buildFileRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
    +		try (Response response = client.newCall(fileRequest).execute()) {
    +			// expected JSON payload is missing
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +
    +		int mixedId = RANDOM.nextInt();
    +		Request mixedRequest = buildMixedRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), mixedId);
    +		try (Response response = client.newCall(mixedRequest).execute()) {
    +			assertEquals(mixedHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
    +			assertEquals(mixedId, mixedHandler.lastReceivedRequest.index);
    +		}
    +	}
    +
    +	@Test
    +	public void testJsonMultipart() throws Exception {
    +		OkHttpClient client = new OkHttpClient();
    +
    +		int jsonId = RANDOM.nextInt();
    +		Request jsonRequest = buildJsonRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL(), jsonId);
    +		try (Response response = client.newCall(jsonRequest).execute()) {
    +			assertEquals(jsonHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
    +			assertEquals(jsonId, jsonHandler.lastReceivedRequest.index);
    +		}
    +
    +		Request fileRequest = buildFileRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL());
    +		try (Response response = client.newCall(fileRequest).execute()) {
    +			// either because JSON payload is missing or FileUploads are outright forbidden
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +
    +		Request mixedRequest = buildMixedRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
    +		try (Response response = client.newCall(mixedRequest).execute()) {
    +			// FileUploads are outright forbidden
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +	}
    +
    +	@Test
    +	public void testFileMultipart() throws Exception {
    +		OkHttpClient client = new OkHttpClient();
    +
    +		Request jsonRequest = buildJsonRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
    +		try (Response response = client.newCall(jsonRequest).execute()) {
    +			// JSON payload did not match expected format
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +
    +		Request fileRequest = buildFileRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL());
    +		try (Response response = client.newCall(fileRequest).execute()) {
    +			assertEquals(fileHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
    +		}
    +
    +		Request mixedRequest = buildMixedRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
    +		try (Response response = client.newCall(mixedRequest).execute()) {
    +			// JSON payload did not match expected format
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +	}
    --- End diff --
    
    it's a little tricky to test, since the handler responds to the request before cleaning the files. (since the response is sent within the try-with-resources statement)


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r197086869
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java ---
    @@ -151,18 +143,26 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms
     				ctx.fireChannelRead(msg);
     			}
     		} catch (Exception e) {
    -			HttpRequest tmpRequest = currentHttpRequest;
    -			deleteUploadedFiles();
    -			reset();
    -			LOG.warn("Internal server error. File upload failed.", e);
    -			HandlerUtils.sendErrorResponse(
    -				ctx,
    -				tmpRequest,
    -				new ErrorResponseBody("File upload failed."),
    -				HttpResponseStatus.INTERNAL_SERVER_ERROR,
    -				Collections.emptyMap()
    -			);
    +			handleError(ctx, "File upload failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, e);
    +		}
    +	}
    +
    +	private void handleError(ChannelHandlerContext ctx, String errorMessage, HttpResponseStatus responseStatus, @Nullable Throwable e) {
    +		HttpRequest tmpRequest = currentHttpRequest;
    +		deleteUploadedFiles();
    +		reset();
    +		if (e == null) {
    +			LOG.warn(errorMessage);
    +		} else {
    +			LOG.warn(errorMessage, e);
    --- End diff --
    
    I don't think so.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6178


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196081080
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java ---
    @@ -0,0 +1,455 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.configuration.WebOptions;
    +import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
    +import org.apache.flink.runtime.rest.handler.FileUploads;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.RequestBody;
    +import org.apache.flink.runtime.rest.util.RestMapperUtils;
    +import org.apache.flink.runtime.rpc.RpcUtils;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +
    +import okhttp3.MediaType;
    +import okhttp3.MultipartBody;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Request;
    +import okhttp3.Response;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.concurrent.CompletableFuture;
    +
    +import static java.util.Objects.requireNonNull;
    +import static org.junit.Assert.assertArrayEquals;
    +import static org.junit.Assert.assertEquals;
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    +
    +/**
    + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly
    + * handled.
    + */
    +public class FileUploadHandlerTest {
    --- End diff --
    
    needs a test for
    * multiple files
    * name of uploaded file should be what is specified in the request


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196557260
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java ---
    @@ -39,15 +41,21 @@
     public class HandlerRequest<R extends RequestBody, M extends MessageParameters> {
     
     	private final R requestBody;
    +	private final FileUploads uploadedFiles;
    --- End diff --
    
    This could also be a `Collection<File>`


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196740407
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java ---
    @@ -0,0 +1,108 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler;
    +
    +import org.apache.flink.util.FileUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.nio.file.FileVisitResult;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.SimpleFileVisitor;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +
    +/**
    + * A container for uploaded files.
    + *
    + * <p>Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up.
    + * For convenience during testing it also accepts files directly.
    + */
    +public final class FileUploads implements AutoCloseable {
    +	private final Collection<Path> directoriesToClean;
    +	private final Collection<Path> uploadedFiles;
    +
    +	@SuppressWarnings("resource")
    +	public static final FileUploads EMPTY = new FileUploads();
    +
    +	private FileUploads() {
    +		this.directoriesToClean = Collections.emptyList();
    +		this.uploadedFiles = Collections.emptyList();
    +	}
    +
    +	public FileUploads(Collection<Path> uploadedFilesOrDirectory) throws IOException {
    +		final Collection<Path> files = new ArrayList<>(4);
    +		final Collection<Path> directories = new ArrayList<>(1);
    +		for (Path fileOrDirectory : uploadedFilesOrDirectory) {
    +			Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute.");
    +			if (Files.isDirectory(fileOrDirectory)) {
    +				directories.add(fileOrDirectory);
    +				FileAdderVisitor visitor = new FileAdderVisitor();
    +				Files.walkFileTree(fileOrDirectory, visitor);
    +				files.addAll(visitor.getContainedFiles());
    +			} else {
    +				files.add(fileOrDirectory);
    +			}
    --- End diff --
    
    Do we have to allow that we can specify files and directories alike? Why not requiring that you have to provide a upload directory which contains all uploaded files. This makes the whole clean up logic easier.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196695325
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java ---
    @@ -72,10 +82,12 @@ public FileUploadHandler(final Path uploadDir) {
     	protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject msg) throws Exception {
     		if (msg instanceof HttpRequest) {
     			final HttpRequest httpRequest = (HttpRequest) msg;
    +			LOG.trace("Received request. URL:{} Method:{}", httpRequest.getUri(), httpRequest.getMethod());
     			if (httpRequest.getMethod().equals(HttpMethod.POST)) {
     				if (HttpPostRequestDecoder.isMultipart(httpRequest)) {
     					currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
     					currentHttpRequest = httpRequest;
    +					currentUploadDir = Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString()));
    --- End diff --
    
    How do we clean up the `currentUploadDir` in case of a failure?


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196836575
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java ---
    @@ -70,51 +84,103 @@ public FileUploadHandler(final Path uploadDir) {
     
     	@Override
     	protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject msg) throws Exception {
    -		if (msg instanceof HttpRequest) {
    -			final HttpRequest httpRequest = (HttpRequest) msg;
    -			if (httpRequest.getMethod().equals(HttpMethod.POST)) {
    -				if (HttpPostRequestDecoder.isMultipart(httpRequest)) {
    -					currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
    -					currentHttpRequest = httpRequest;
    +		try {
    +			if (msg instanceof HttpRequest) {
    +				final HttpRequest httpRequest = (HttpRequest) msg;
    +				LOG.trace("Received request. URL:{} Method:{}", httpRequest.getUri(), httpRequest.getMethod());
    +				if (httpRequest.getMethod().equals(HttpMethod.POST)) {
    +					if (HttpPostRequestDecoder.isMultipart(httpRequest)) {
    +						currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
    +						currentHttpRequest = httpRequest;
    +						currentUploadDir = Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString()));
    +					} else {
    +						ctx.fireChannelRead(msg);
    +					}
     				} else {
     					ctx.fireChannelRead(msg);
     				}
    +			} else if (msg instanceof HttpContent && currentHttpPostRequestDecoder != null) {
    +				// make sure that we still have a upload dir in case that it got deleted in the meanwhile
    +				RestServerEndpoint.createUploadDir(uploadDir, LOG);
    +
    +				final HttpContent httpContent = (HttpContent) msg;
    +				currentHttpPostRequestDecoder.offer(httpContent);
    +
    +				while (currentHttpPostRequestDecoder.hasNext()) {
    +					final InterfaceHttpData data = currentHttpPostRequestDecoder.next();
    +					if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) {
    +						final DiskFileUpload fileUpload = (DiskFileUpload) data;
    +						checkState(fileUpload.isCompleted());
    +
    +						final Path dest = currentUploadDir.resolve(fileUpload.getFilename());
    +						fileUpload.renameTo(dest.toFile());
    +					} else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
    +						final Attribute request = (Attribute) data;
    +						// this could also be implemented by using the first found Attribute as the payload
    +						if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
    +							currentJsonPayload = request.get();
    +						} else {
    +							LOG.warn("Received unknown attribute {}.", data.getName());
    +							HandlerUtils.sendErrorResponse(
    +								ctx,
    +								currentHttpRequest,
    +								new ErrorResponseBody("Received unknown attribute " + data.getName() + '.'),
    +								HttpResponseStatus.BAD_REQUEST,
    +								Collections.emptyMap()
    +							);
    +							deleteUploadedFiles();
    +							reset();
    +							return;
    +						}
    +					}
    +				}
    +
    +				if (httpContent instanceof LastHttpContent) {
    +					ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(Collections.singleton(currentUploadDir)));
    +					ctx.fireChannelRead(currentHttpRequest);
    +					if (currentJsonPayload != null) {
    +						ctx.fireChannelRead(httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload)));
    +					} else {
    +						ctx.fireChannelRead(httpContent);
    +					}
    +					reset();
    +				}
     			} else {
     				ctx.fireChannelRead(msg);
     			}
    -		} else if (msg instanceof HttpContent && currentHttpPostRequestDecoder != null) {
    -			// make sure that we still have a upload dir in case that it got deleted in the meanwhile
    -			RestServerEndpoint.createUploadDir(uploadDir, LOG);
    -
    -			final HttpContent httpContent = (HttpContent) msg;
    -			currentHttpPostRequestDecoder.offer(httpContent);
    -
    -			while (currentHttpPostRequestDecoder.hasNext()) {
    -				final InterfaceHttpData data = currentHttpPostRequestDecoder.next();
    -				if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) {
    -					final DiskFileUpload fileUpload = (DiskFileUpload) data;
    -					checkState(fileUpload.isCompleted());
    -
    -					final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() +
    -						"_" + fileUpload.getFilename()));
    -					fileUpload.renameTo(dest.toFile());
    -					ctx.channel().attr(UPLOADED_FILE).set(dest);
    -				}
    -			}
    +		} catch (Exception e) {
    +			LOG.warn("Internal server error. File upload failed.", e);
    +			HandlerUtils.sendErrorResponse(
    +				ctx,
    +				currentHttpRequest,
    +				new ErrorResponseBody("File upload failed."),
    +				HttpResponseStatus.INTERNAL_SERVER_ERROR,
    +				Collections.emptyMap()
    +			);
    +			deleteUploadedFiles();
    +			reset();
    +		}
    +	}
     
    -			if (httpContent instanceof LastHttpContent) {
    -				ctx.fireChannelRead(currentHttpRequest);
    -				ctx.fireChannelRead(httpContent);
    -				reset();
    +	private void deleteUploadedFiles() {
    +		if (currentUploadDir != null) {
    +			try (FileUploads uploads = new FileUploads(Collections.singleton(currentUploadDir))) {
    --- End diff --
    
    The idea was to define the cleanup logic in one place, in this case in the `FileUploads` class.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196453584
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler;
    +
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.nio.file.FileVisitResult;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.SimpleFileVisitor;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +
    +/**
    + * A container for uploaded files.
    + *
    + * <p>Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up.
    + * For convenience during testing it also accepts files directly.
    + */
    +public final class FileUploads implements AutoCloseable {
    +	private final Collection<Path> directoriesToClean;
    +	private final Collection<Path> uploadedFiles;
    +
    +	@SuppressWarnings("resource")
    +	public static final FileUploads EMPTY = new FileUploads();
    +
    +	private FileUploads() {
    +		this.directoriesToClean = Collections.emptyList();
    +		this.uploadedFiles = Collections.emptyList();
    +	}
    +
    +	public FileUploads(Collection<Path> uploadedFilesOrDirectory) throws IOException {
    +		final Collection<Path> files = new ArrayList<>(4);
    +		final Collection<Path> directories = new ArrayList<>(1);
    +		for (Path fileOrDirectory : uploadedFilesOrDirectory) {
    +			Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute.");
    +			if (Files.isDirectory(fileOrDirectory)) {
    +				directories.add(fileOrDirectory);
    +				FileAdderVisitor visitor = new FileAdderVisitor();
    +				Files.walkFileTree(fileOrDirectory, visitor);
    +				files.addAll(visitor.get());
    +			} else {
    +				files.add(fileOrDirectory);
    +			}
    +		}
    +		directoriesToClean = Collections.unmodifiableCollection(directories);
    +		uploadedFiles = Collections.unmodifiableCollection(files);
    --- End diff --
    
    Let's move this logic out of `FileUploads` and simply initialize it with a `Collection<File>`.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196689843
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java ---
    @@ -116,5 +136,16 @@ private void reset() {
     		currentHttpPostRequestDecoder.destroy();
     		currentHttpPostRequestDecoder = null;
     		currentHttpRequest = null;
    +		currentUploadDir = null;
    +		currentJsonPayload = null;
    +	}
    +
    +	public static Optional<byte[]> getMultipartJsonPayload(ChannelHandlerContext ctx) {
    +		return Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get());
    +	}
    +
    +	public static FileUploads getMultipartFileUploads(ChannelHandlerContext ctx) {
    +		return Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get())
    +			.orElse(FileUploads.EMPTY);
    --- End diff --
    
    True, then I would suggest to only store the directory in the `FileUploads` and adding a method to retrieve all uploaded files.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196559335
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java ---
    @@ -0,0 +1,471 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.configuration.WebOptions;
    +import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
    +import org.apache.flink.runtime.rest.handler.FileUploads;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.RequestBody;
    +import org.apache.flink.runtime.rest.util.RestMapperUtils;
    +import org.apache.flink.runtime.rpc.RpcUtils;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +
    +import okhttp3.MediaType;
    +import okhttp3.MultipartBody;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Request;
    +import okhttp3.Response;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.concurrent.CompletableFuture;
    +
    +import static java.util.Objects.requireNonNull;
    +import static org.junit.Assert.assertArrayEquals;
    +import static org.junit.Assert.assertEquals;
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    +
    +/**
    + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly
    + * handled.
    + */
    +public class FileUploadHandlerTest {
    +
    +	private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper();
    +	private static final Random RANDOM = new Random();
    +
    +	@ClassRule
    +	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    +
    +	private static RestServerEndpoint serverEndpoint;
    +	private static String serverAddress;
    +
    +	private static MultipartMixedHandler mixedHandler;
    +	private static MultipartJsonHandler jsonHandler;
    +	private static MultipartFileHandler fileHandler;
    +	private static File file1;
    +	private static File file2;
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		Configuration config = new Configuration();
    +		config.setInteger(RestOptions.PORT, 0);
    +		config.setString(RestOptions.ADDRESS, "localhost");
    +		config.setString(WebOptions.UPLOAD_DIR, TEMPORARY_FOLDER.newFolder().getCanonicalPath());
    +
    +		RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
    +
    +		final String restAddress = "http://localhost:1234";
    +		RestfulGateway mockRestfulGateway = mock(RestfulGateway.class);
    --- End diff --
    
    You could try using the `TestingRestfulGateway` to avoid Mockito.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r197053400
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest;
    +
    +import org.apache.flink.runtime.rest.handler.FileUploads;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.handler.router.RouteResult;
    +import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
    +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
    +import org.apache.flink.runtime.rpc.RpcUtils;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +
    +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
    +import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
    +import org.apache.flink.shaded.netty4.io.netty.util.Attribute;
    +import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
    +
    +import org.junit.Assert;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.util.Collections;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    +
    +/**
    + * Tests for {@link AbstractHandler}.
    + */
    +public class AbstractHandlerTest {
    +
    +	@Rule
    +	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	@Test
    +	public void testFileCleanup() throws Exception {
    +		final Path file = temporaryFolder.newFile().toPath();
    +
    +		final String restAddress = "http://localhost:1234";
    +		RestfulGateway mockRestfulGateway = TestingRestfulGateway.newBuilder()
    +			.setRestAddress(restAddress)
    +			.build();
    +
    +		final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () ->
    +			CompletableFuture.completedFuture(mockRestfulGateway);
    +
    +		TestHandler handler = new TestHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
    +
    +		RouteResult<?> routeResult = new RouteResult<>("", "", Collections.emptyMap(), Collections.emptyMap(), "");
    +		HttpRequest request = new DefaultFullHttpRequest(
    +			HttpVersion.HTTP_1_1,
    +			HttpMethod.GET,
    +			TestHandler.TestHeaders.INSTANCE.getTargetRestEndpointURL(),
    +			Unpooled.wrappedBuffer(new byte[0]));
    +		RoutedRequest<?> routerRequest = new RoutedRequest<>(routeResult, request);
    +
    +		Attribute<FileUploads> attribute = new SimpleAttribute();
    +		attribute.set(new FileUploads(Collections.emptyList(), Collections.singleton(file)));
    +		Channel channel = mock(Channel.class);
    +		when(channel.attr(any(AttributeKey.class))).thenReturn(attribute);
    +
    +		ChannelHandlerContext context = mock(ChannelHandlerContext.class);
    +		when(context.channel()).thenReturn(channel);
    --- End diff --
    
    Are you referring to the one in `AbstractTaskManagerFileHandlerTest`? If so, I don't really see a difference between mocking 1 method and having an "implementation" that only really implements a miniscule subset.
    
    I initially implemented both the `Channel` and `ChannelHandlerContext` with the same behavior as the mocks, but all that was was add 50+ methods that are just noise.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r197054770
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java ---
    @@ -259,6 +271,29 @@ public void testFileMultipart() throws Exception {
     		}
     	}
     
    +	@Test
    +	public void testUploadCleanupOnFailure() throws IOException {
    +		OkHttpClient client = new OkHttpClient();
    +
    +		Request request = buildMixedRequestWithUnknownAttribute(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
    +		try (Response response = client.newCall(request).execute()) {
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +		assertUploadDirectoryIsEmpty();
    +	}
    +
    +	private static void assertUploadDirectoryIsEmpty() throws IOException {
    +		Preconditions.checkArgument(
    +			1 == Files.list(configuredUploadDir).count(),
    +			"Directory structure in rest upload directory has changed. Test must be adjusted");
    +		Optional<Path> actualUploadDir = Files.list(configuredUploadDir).findAny();
    +		Preconditions.checkArgument(
    +			actualUploadDir.isPresent(),
    +			"Expected upload directory does not exist.");
    +		System.out.println(Files.list(actualUploadDir.get()).collect(Collectors.toList()));
    +		assertEquals("Not all files were cleaned up.", 0, Files.list(actualUploadDir.get()).count());
    --- End diff --
    
    This test wasn't covering the case of exceptions but the rejection of unknown attributes. Will try to find a way to crash the handler.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196129193
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler;
    +
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.FileNotFoundException;
    +import java.io.IOException;
    +import java.nio.file.FileVisitResult;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.SimpleFileVisitor;
    +import java.nio.file.attribute.BasicFileAttributes;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +
    +/**
    + * A container for uploaded files.
    + *
    + * <p>Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up.
    + * For convenience during testing it also accepts files directly.
    + */
    +public final class FileUploads implements AutoCloseable {
    +	private final Collection<Path> directoriesToClean;
    +	private final Collection<Path> uploadedFiles;
    +
    +	@SuppressWarnings("resource")
    +	public static final FileUploads EMPTY = new FileUploads();
    +
    +	private FileUploads() {
    +		this.directoriesToClean = Collections.emptyList();
    +		this.uploadedFiles = Collections.emptyList();
    +	}
    +
    +	public FileUploads(Collection<Path> uploadedFilesOrDirectory) throws IOException {
    +		final Collection<Path> files = new ArrayList<>(4);
    +		final Collection<Path> directories = new ArrayList<>(1);
    +		for (Path fileOrDirectory : uploadedFilesOrDirectory) {
    +			Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute.");
    +			if (Files.isDirectory(fileOrDirectory)) {
    +				directories.add(fileOrDirectory);
    +				FileAdderVisitor visitor = new FileAdderVisitor();
    --- End diff --
    
    This is probably more complex than really necessary; there is no use-case for a _nested_ directory structure.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196453235
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java ---
    @@ -95,14 +107,22 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms
     					final DiskFileUpload fileUpload = (DiskFileUpload) data;
     					checkState(fileUpload.isCompleted());
     
    -					final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() +
    -						"_" + fileUpload.getFilename()));
    +					final Path dest = currentUploadDir.resolve(fileUpload.getFilename());
     					fileUpload.renameTo(dest.toFile());
    -					ctx.channel().attr(UPLOADED_FILE).set(dest);
    +				} else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
    +					final Attribute request = (Attribute) data;
    +					// this could also be implemented by using the first found Attribute as the payload
    +					if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
    +						currentJsonPayload = request.get();
    +					} else {
    +						LOG.warn("Received unknown attribute {}, will be ignored.", data.getName());
    +					}
     				}
     			}
     
     			if (httpContent instanceof LastHttpContent) {
    +				ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(Collections.singleton(currentUploadDir)));
    --- End diff --
    
    I would suggest to simply store the upload directory in the `UPLOAD_FILES` attribute.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196701452
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java ---
    @@ -116,5 +136,16 @@ private void reset() {
     		currentHttpPostRequestDecoder.destroy();
     		currentHttpPostRequestDecoder = null;
     		currentHttpRequest = null;
    +		currentUploadDir = null;
    +		currentJsonPayload = null;
    +	}
    +
    +	public static Optional<byte[]> getMultipartJsonPayload(ChannelHandlerContext ctx) {
    +		return Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get());
    +	}
    +
    +	public static FileUploads getMultipartFileUploads(ChannelHandlerContext ctx) {
    +		return Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get())
    +			.orElse(FileUploads.EMPTY);
    --- End diff --
    
    I think not much. My gut feeling is just that `FileUploads` can be simplified. Instead of having our own FileVisitor, we could simply call `FileUtils.deleteDirectory(uploadDirectory)`. And I think  this class has actually two responsibilities: Listing all files to make them accessible and storing the directories in which they reside to delete them afterwards.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196741065
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java ---
    @@ -0,0 +1,483 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.configuration.WebOptions;
    +import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.RequestBody;
    +import org.apache.flink.runtime.rest.util.RestMapperUtils;
    +import org.apache.flink.runtime.rpc.RpcUtils;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +
    +import okhttp3.MediaType;
    +import okhttp3.MultipartBody;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Request;
    +import okhttp3.Response;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.concurrent.CompletableFuture;
    +
    +import static java.util.Objects.requireNonNull;
    +import static org.junit.Assert.assertArrayEquals;
    +import static org.junit.Assert.assertEquals;
    +
    +/**
    + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly
    + * handled.
    + */
    +public class FileUploadHandlerTest extends TestLogger {
    +
    +	private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper();
    +	private static final Random RANDOM = new Random();
    +
    +	@ClassRule
    +	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    +
    +	private static RestServerEndpoint serverEndpoint;
    +	private static String serverAddress;
    +
    +	private static MultipartMixedHandler mixedHandler;
    +	private static MultipartJsonHandler jsonHandler;
    +	private static MultipartFileHandler fileHandler;
    +	private static File file1;
    +	private static File file2;
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		Configuration config = new Configuration();
    +		config.setInteger(RestOptions.PORT, 0);
    +		config.setString(RestOptions.ADDRESS, "localhost");
    +		config.setString(WebOptions.UPLOAD_DIR, TEMPORARY_FOLDER.newFolder().getCanonicalPath());
    +
    +		RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
    +
    +		final String restAddress = "http://localhost:1234";
    +		RestfulGateway mockRestfulGateway = TestingRestfulGateway.newBuilder()
    +			.setRestAddress(restAddress)
    +			.build();
    +
    +		final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () ->
    +			CompletableFuture.completedFuture(mockRestfulGateway);
    +
    +		file1 = TEMPORARY_FOLDER.newFile();
    +		Files.write(file1.toPath(), "hello".getBytes(ConfigConstants.DEFAULT_CHARSET));
    +		file2 = TEMPORARY_FOLDER.newFile();
    +		Files.write(file2.toPath(), "world".getBytes(ConfigConstants.DEFAULT_CHARSET));
    +
    +		mixedHandler = new MultipartMixedHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
    +		jsonHandler = new MultipartJsonHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
    +		fileHandler = new MultipartFileHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
    +
    +		final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = Arrays.asList(
    +			Tuple2.of(mixedHandler.getMessageHeaders(), mixedHandler),
    +			Tuple2.of(jsonHandler.getMessageHeaders(), jsonHandler),
    +			Tuple2.of(fileHandler.getMessageHeaders(), fileHandler));
    +
    +		serverEndpoint = new TestRestServerEndpoint(serverConfig, handlers);
    +
    +		serverEndpoint.start();
    +		serverAddress = serverEndpoint.getRestBaseUrl();
    +	}
    +
    +	@AfterClass
    +	public static void teardown() throws Exception {
    +		if (serverEndpoint != null) {
    +			serverEndpoint.close();
    +			serverEndpoint = null;
    +		}
    +	}
    +
    +	private static Request buildFileRequest(String headerUrl) {
    +		MultipartBody.Builder builder = new MultipartBody.Builder();
    +		builder = addFilePart(builder);
    +		return finalizeRequest(builder, headerUrl);
    +	}
    +
    +	private static Request buildJsonRequest(String headerUrl, int index) throws IOException {
    +		MultipartBody.Builder builder = new MultipartBody.Builder();
    +		builder = addJsonPart(builder, index);
    +		return finalizeRequest(builder, headerUrl);
    +	}
    +
    +	private static Request buildMixedRequest(String headerUrl, int index) throws IOException {
    +		MultipartBody.Builder builder = new MultipartBody.Builder();
    +		builder = addJsonPart(builder, index);
    +		builder = addFilePart(builder);
    +		return finalizeRequest(builder, headerUrl);
    +	}
    +
    +	private static Request finalizeRequest(MultipartBody.Builder builder, String headerUrl) {
    +		MultipartBody multipartBody = builder
    +			.setType(MultipartBody.FORM)
    +			.build();
    +
    +		return new Request.Builder()
    +			.url(serverAddress + headerUrl)
    +			.post(multipartBody)
    +			.build();
    +	}
    +
    +	private static MultipartBody.Builder addFilePart(MultipartBody.Builder builder) {
    +		okhttp3.RequestBody filePayload1 = okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file1);
    +		okhttp3.RequestBody filePayload2 = okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file2);
    +
    +		return builder.addFormDataPart("file1", file1.getName(), filePayload1)
    +			.addFormDataPart("file2", file2.getName(), filePayload2);
    +	}
    +
    +	private static MultipartBody.Builder addJsonPart(MultipartBody.Builder builder, int index) throws IOException {
    +		TestRequestBody jsonRequestBody = new TestRequestBody(index);
    +
    +		StringWriter sw = new StringWriter();
    +		OBJECT_MAPPER.writeValue(sw, jsonRequestBody);
    +
    +		String jsonPayload = sw.toString();
    +
    +		return builder.addFormDataPart(org.apache.flink.runtime.rest.FileUploadHandler.HTTP_ATTRIBUTE_REQUEST, jsonPayload);
    +	}
    +
    +	@Test
    +	public void testMixedMultipart() throws Exception {
    +		OkHttpClient client = new OkHttpClient();
    +
    +		Request jsonRequest = buildJsonRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
    +		try (Response response = client.newCall(jsonRequest).execute()) {
    +			// explicitly rejected by the test handler implementation
    +			assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), response.code());
    +		}
    +
    +		Request fileRequest = buildFileRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
    +		try (Response response = client.newCall(fileRequest).execute()) {
    +			// expected JSON payload is missing
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +
    +		int mixedId = RANDOM.nextInt();
    +		Request mixedRequest = buildMixedRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), mixedId);
    +		try (Response response = client.newCall(mixedRequest).execute()) {
    +			assertEquals(mixedHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
    +			assertEquals(mixedId, mixedHandler.lastReceivedRequest.index);
    +		}
    +	}
    +
    +	@Test
    +	public void testJsonMultipart() throws Exception {
    +		OkHttpClient client = new OkHttpClient();
    +
    +		int jsonId = RANDOM.nextInt();
    +		Request jsonRequest = buildJsonRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL(), jsonId);
    +		try (Response response = client.newCall(jsonRequest).execute()) {
    +			assertEquals(jsonHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
    +			assertEquals(jsonId, jsonHandler.lastReceivedRequest.index);
    +		}
    +
    +		Request fileRequest = buildFileRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL());
    +		try (Response response = client.newCall(fileRequest).execute()) {
    +			// either because JSON payload is missing or FileUploads are outright forbidden
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +
    +		Request mixedRequest = buildMixedRequest(jsonHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
    +		try (Response response = client.newCall(mixedRequest).execute()) {
    +			// FileUploads are outright forbidden
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +	}
    +
    +	@Test
    +	public void testFileMultipart() throws Exception {
    +		OkHttpClient client = new OkHttpClient();
    +
    +		Request jsonRequest = buildJsonRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
    +		try (Response response = client.newCall(jsonRequest).execute()) {
    +			// JSON payload did not match expected format
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +
    +		Request fileRequest = buildFileRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL());
    +		try (Response response = client.newCall(fileRequest).execute()) {
    +			assertEquals(fileHandler.getMessageHeaders().getResponseStatusCode().code(), response.code());
    +		}
    +
    +		Request mixedRequest = buildMixedRequest(fileHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt());
    +		try (Response response = client.newCall(mixedRequest).execute()) {
    +			// JSON payload did not match expected format
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +	}
    --- End diff --
    
    Can we add a test which assures that in case of a failure in `FileUploadHandler` the upload directory gets deleted?


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196681937
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java ---
    @@ -0,0 +1,471 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.configuration.WebOptions;
    +import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
    +import org.apache.flink.runtime.rest.handler.FileUploads;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.RequestBody;
    +import org.apache.flink.runtime.rest.util.RestMapperUtils;
    +import org.apache.flink.runtime.rpc.RpcUtils;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +
    +import okhttp3.MediaType;
    +import okhttp3.MultipartBody;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Request;
    +import okhttp3.Response;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.concurrent.CompletableFuture;
    +
    +import static java.util.Objects.requireNonNull;
    +import static org.junit.Assert.assertArrayEquals;
    +import static org.junit.Assert.assertEquals;
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    +
    +/**
    + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly
    + * handled.
    + */
    +public class FileUploadHandlerTest {
    +
    +	private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper();
    +	private static final Random RANDOM = new Random();
    +
    +	@ClassRule
    +	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    +
    +	private static RestServerEndpoint serverEndpoint;
    +	private static String serverAddress;
    +
    +	private static MultipartMixedHandler mixedHandler;
    +	private static MultipartJsonHandler jsonHandler;
    +	private static MultipartFileHandler fileHandler;
    +	private static File file1;
    +	private static File file2;
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		Configuration config = new Configuration();
    +		config.setInteger(RestOptions.PORT, 0);
    +		config.setString(RestOptions.ADDRESS, "localhost");
    +		config.setString(WebOptions.UPLOAD_DIR, TEMPORARY_FOLDER.newFolder().getCanonicalPath());
    +
    +		RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
    +
    +		final String restAddress = "http://localhost:1234";
    +		RestfulGateway mockRestfulGateway = mock(RestfulGateway.class);
    --- End diff --
    
    neat.


---

[GitHub] flink issue #6178: [FLINK-9599][rest] Implement generic mechanism to access ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/6178
  
    Checkstyle errors:
    ```
    18:31:10.780 [INFO] There are 2 errors reported by Checkstyle 8.4 with /tools/maven/checkstyle.xml ruleset.
    18:31:10.788 [ERROR] src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java:[56] (regexp) RegexpSingleline: Trailing whitespace
    18:31:10.804 [ERROR] src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java:[99] (regexp) RegexpSingleline: Trailing whitespace
    ```


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r197051395
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java ---
    @@ -259,6 +271,29 @@ public void testFileMultipart() throws Exception {
     		}
     	}
     
    +	@Test
    +	public void testUploadCleanupOnFailure() throws IOException {
    +		OkHttpClient client = new OkHttpClient();
    +
    +		Request request = buildMixedRequestWithUnknownAttribute(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
    +		try (Response response = client.newCall(request).execute()) {
    +			assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code());
    +		}
    +		assertUploadDirectoryIsEmpty();
    +	}
    +
    +	private static void assertUploadDirectoryIsEmpty() throws IOException {
    +		Preconditions.checkArgument(
    +			1 == Files.list(configuredUploadDir).count(),
    +			"Directory structure in rest upload directory has changed. Test must be adjusted");
    +		Optional<Path> actualUploadDir = Files.list(configuredUploadDir).findAny();
    +		Preconditions.checkArgument(
    +			actualUploadDir.isPresent(),
    +			"Expected upload directory does not exist.");
    +		System.out.println(Files.list(actualUploadDir.get()).collect(Collectors.toList()));
    --- End diff --
    
    Remove system out


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196699736
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java ---
    @@ -72,10 +82,12 @@ public FileUploadHandler(final Path uploadDir) {
     	protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject msg) throws Exception {
     		if (msg instanceof HttpRequest) {
     			final HttpRequest httpRequest = (HttpRequest) msg;
    +			LOG.trace("Received request. URL:{} Method:{}", httpRequest.getUri(), httpRequest.getMethod());
     			if (httpRequest.getMethod().equals(HttpMethod.POST)) {
     				if (HttpPostRequestDecoder.isMultipart(httpRequest)) {
     					currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
     					currentHttpRequest = httpRequest;
    +					currentUploadDir = Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString()));
    --- End diff --
    
    I think we should add this.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196667205
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java ---
    @@ -95,14 +107,22 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms
     					final DiskFileUpload fileUpload = (DiskFileUpload) data;
     					checkState(fileUpload.isCompleted());
     
    -					final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() +
    -						"_" + fileUpload.getFilename()));
    +					final Path dest = currentUploadDir.resolve(fileUpload.getFilename());
     					fileUpload.renameTo(dest.toFile());
    -					ctx.channel().attr(UPLOADED_FILE).set(dest);
    +				} else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
    +					final Attribute request = (Attribute) data;
    +					// this could also be implemented by using the first found Attribute as the payload
    +					if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
    +						currentJsonPayload = request.get();
    +					} else {
    +						LOG.warn("Received unknown attribute {}, will be ignored.", data.getName());
    +					}
     				}
     			}
     
     			if (httpContent instanceof LastHttpContent) {
    +				ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(Collections.singleton(currentUploadDir)));
    +				ctx.channel().attr(UPLOADED_JSON).set(currentJsonPayload);
    --- End diff --
    
    I can try this (it would be neat to remove the special case in `AbstractHandler`, but I'm wondering whether we can "simply" replace the payload of the multipart request (as identified by the headers that we also forward) with plain json.


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196559093
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java ---
    @@ -0,0 +1,471 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.configuration.WebOptions;
    +import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
    +import org.apache.flink.runtime.rest.handler.FileUploads;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.RequestBody;
    +import org.apache.flink.runtime.rest.util.RestMapperUtils;
    +import org.apache.flink.runtime.rpc.RpcUtils;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
    +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +
    +import okhttp3.MediaType;
    +import okhttp3.MultipartBody;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Request;
    +import okhttp3.Response;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.concurrent.CompletableFuture;
    +
    +import static java.util.Objects.requireNonNull;
    +import static org.junit.Assert.assertArrayEquals;
    +import static org.junit.Assert.assertEquals;
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    +
    +/**
    + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly
    + * handled.
    + */
    +public class FileUploadHandlerTest {
    --- End diff --
    
    Missing `extends TestLogger`


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196712002
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java ---
    @@ -95,14 +107,22 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms
     					final DiskFileUpload fileUpload = (DiskFileUpload) data;
     					checkState(fileUpload.isCompleted());
     
    -					final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() +
    -						"_" + fileUpload.getFilename()));
    +					final Path dest = currentUploadDir.resolve(fileUpload.getFilename());
     					fileUpload.renameTo(dest.toFile());
    -					ctx.channel().attr(UPLOADED_FILE).set(dest);
    +				} else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
    +					final Attribute request = (Attribute) data;
    +					// this could also be implemented by using the first found Attribute as the payload
    +					if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
    +						currentJsonPayload = request.get();
    +					} else {
    +						LOG.warn("Received unknown attribute {}, will be ignored.", data.getName());
    +					}
     				}
     			}
     
     			if (httpContent instanceof LastHttpContent) {
    +				ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(Collections.singleton(currentUploadDir)));
    +				ctx.channel().attr(UPLOADED_JSON).set(currentJsonPayload);
    --- End diff --
    
    well look at that, it _actually works_


---

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196452583
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java ---
    @@ -116,5 +136,16 @@ private void reset() {
     		currentHttpPostRequestDecoder.destroy();
     		currentHttpPostRequestDecoder = null;
     		currentHttpRequest = null;
    +		currentUploadDir = null;
    +		currentJsonPayload = null;
    +	}
    +
    +	public static Optional<byte[]> getMultipartJsonPayload(ChannelHandlerContext ctx) {
    +		return Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get());
    +	}
    +
    +	public static FileUploads getMultipartFileUploads(ChannelHandlerContext ctx) {
    +		return Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get())
    +			.orElse(FileUploads.EMPTY);
    --- End diff --
    
    I would suggest to simply return the upload directory.


---

[GitHub] flink issue #6178: [FLINK-9599][rest] Implement generic mechanism to access ...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/6178
  
    @tillrohrmann I believe I've addressed all your comments.


---