You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/10/11 21:32:35 UTC

[1/6] flink git commit: [FLINK-7807] [REST] Log exceptions in HandlerUtils methods

Repository: flink
Updated Branches:
  refs/heads/master eef0db090 -> b2cac3a2a


[FLINK-7807] [REST] Log exceptions in HandlerUtils methods

This closes #7807.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a9c13c7d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a9c13c7d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a9c13c7d

Branch: refs/heads/master
Commit: a9c13c7d90253cc59749c20e6ee6e7f790cb598a
Parents: e2ae45b
Author: zentol <ch...@apache.org>
Authored: Wed Oct 11 13:43:40 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 11 22:06:58 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/rest/handler/util/HandlerUtils.java   | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a9c13c7d/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
index 0d7483a..7c37998 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
@@ -36,6 +36,8 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
@@ -51,6 +53,8 @@ import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVer
  */
 public class HandlerUtils {
 
+	private static final Logger LOG = LoggerFactory.getLogger(HandlerUtils.class);
+
 	private static final ObjectMapper mapper = RestMapperUtils.getStrictObjectMapper();
 
 	/**
@@ -71,6 +75,7 @@ public class HandlerUtils {
 		try {
 			mapper.writeValue(sw, response);
 		} catch (IOException ioe) {
+			LOG.error("Internal server error. Could not map response to JSON.", ioe);
 			sendErrorResponse(channelHandlerContext, httpRequest, new ErrorResponseBody("Internal server error. Could not map response to JSON."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
 			return;
 		}
@@ -96,6 +101,7 @@ public class HandlerUtils {
 			mapper.writeValue(sw, errorMessage);
 		} catch (IOException e) {
 			// this should never happen
+			LOG.error("Internal server error. Could not map error response to JSON.", e);
 			sendResponse(channelHandlerContext, httpRequest, "Internal server error. Could not map error response to JSON.", HttpResponseStatus.INTERNAL_SERVER_ERROR);
 		}
 		sendResponse(channelHandlerContext, httpRequest, sw.toString(), statusCode);


[4/6] flink git commit: [hotfix][doc] Remove outdated best-practice suggestion to use .withParameters()

Posted by ch...@apache.org.
[hotfix][doc] Remove outdated best-practice suggestion to use .withParameters()

This way of passing parameters does not work with the streaming API and
is actually confusing for users.

This closes #4797.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/371ec9d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/371ec9d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/371ec9d6

Branch: refs/heads/master
Commit: 371ec9d6f02b031e7bb1a520b49c39833a394973
Parents: a9c13c7
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Oct 11 12:10:13 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 11 22:06:59 2017 +0200

----------------------------------------------------------------------
 docs/dev/best_practices.md | 22 ----------------------
 1 file changed, 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/371ec9d6/docs/dev/best_practices.md
----------------------------------------------------------------------
diff --git a/docs/dev/best_practices.md b/docs/dev/best_practices.md
index 2a1d32e..24e7091 100644
--- a/docs/dev/best_practices.md
+++ b/docs/dev/best_practices.md
@@ -111,28 +111,6 @@ DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters)
 
 and then use it inside the function for getting values from the command line.
 
-
-#### Passing parameters as a `Configuration` object to single functions
-
-The example below shows how to pass the parameters as a `Configuration` object to a user defined function.
-
-{% highlight java %}
-ParameterTool parameters = ParameterTool.fromArgs(args);
-DataSet<Tuple2<String, Integer>> counts = text
-        .flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())
-{% endhighlight %}
-
-In the `Tokenizer`, the object is now accessible in the `open(Configuration conf)` method:
-
-{% highlight java %}
-public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
-    @Override
-    public void open(Configuration parameters) throws Exception {
-	parameters.getInteger("myInt", -1);
-	// .. do
-{% endhighlight %}
-
-
 #### Register the parameters globally
 
 Parameters registered as global job parameters in the `ExecutionConfig` can be accessed as configuration values from the JobManager web interface and in all functions defined by the user.


[5/6] flink git commit: [hotfix] [Javadoc] Fix typo in Javadoc for class InputTypeConfigurable

Posted by ch...@apache.org.
[hotfix] [Javadoc] Fix typo in Javadoc for class InputTypeConfigurable


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc3f8390
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc3f8390
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc3f8390

Branch: refs/heads/master
Commit: dc3f839063fe4afcdbb2334bdecc9f5f9f9545a6
Parents: 371ec9d
Author: gyao <ga...@data-artisans.com>
Authored: Tue Oct 10 16:26:52 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 11 22:06:59 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/api/java/typeutils/InputTypeConfigurable.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dc3f8390/flink-core/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
index b7fc2e2..8f749a9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 /**
  * {@link org.apache.flink.api.common.io.OutputFormat}s can implement this interface to be configured
- * with the data type they will operate on. The method {@link #setInputType(TypeInformation, ExecutionConfig)
+ * with the data type they will operate on. The method {@link #setInputType(TypeInformation, ExecutionConfig)}
  * will be called when the output format is used with an output API method.
  */
 @Public


[2/6] flink git commit: [FLINK-7661][network] Add credit field in PartitionRequest message

Posted by ch...@apache.org.
[FLINK-7661][network] Add credit field in PartitionRequest message

This closes #4698.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/891f359d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/891f359d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/891f359d

Branch: refs/heads/master
Commit: 891f359d710146acf3d05cd2af3bb430a8fbc99b
Parents: eef0db0
Author: Zhijiang <wa...@aliyun.com>
Authored: Thu Sep 21 16:28:16 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 11 22:06:58 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/io/network/netty/NettyMessage.java     | 11 ++++++++---
 .../runtime/io/network/netty/PartitionRequestClient.java |  2 +-
 .../network/partition/consumer/RemoteInputChannel.java   |  4 ++++
 .../io/network/netty/CancelPartitionRequestTest.java     |  4 ++--
 .../io/network/netty/NettyMessageSerializationTest.java  |  3 ++-
 .../network/netty/ServerTransportErrorHandlingTest.java  |  2 +-
 6 files changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index d7ddfa6..c035010 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -358,10 +358,13 @@ abstract class NettyMessage {
 
 		final InputChannelID receiverId;
 
-		PartitionRequest(ResultPartitionID partitionId, int queueIndex, InputChannelID receiverId) {
+		final int credit;
+
+		PartitionRequest(ResultPartitionID partitionId, int queueIndex, InputChannelID receiverId, int credit) {
 			this.partitionId = checkNotNull(partitionId);
 			this.queueIndex = queueIndex;
 			this.receiverId = checkNotNull(receiverId);
+			this.credit = credit;
 		}
 
 		@Override
@@ -369,12 +372,13 @@ abstract class NettyMessage {
 			ByteBuf result = null;
 
 			try {
-				result = allocateBuffer(allocator, ID, 16 + 16 + 4 + 16);
+				result = allocateBuffer(allocator, ID, 16 + 16 + 4 + 16 + 4);
 
 				partitionId.getPartitionId().writeTo(result);
 				partitionId.getProducerId().writeTo(result);
 				result.writeInt(queueIndex);
 				receiverId.writeTo(result);
+				result.writeInt(credit);
 
 				return result;
 			}
@@ -394,8 +398,9 @@ abstract class NettyMessage {
 					ExecutionAttemptID.fromByteBuf(buffer));
 			int queueIndex = buffer.readInt();
 			InputChannelID receiverId = InputChannelID.fromByteBuf(buffer);
+			int credit = buffer.readInt();
 
-			return new PartitionRequest(partitionId, queueIndex, receiverId);
+			return new PartitionRequest(partitionId, queueIndex, receiverId, credit);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
index 7850974..8dbc6b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
@@ -106,7 +106,7 @@ public class PartitionRequestClient {
 		partitionRequestHandler.addInputChannel(inputChannel);
 
 		final PartitionRequest request = new PartitionRequest(
-				partitionId, subpartitionIndex, inputChannel.getInputChannelId());
+				partitionId, subpartitionIndex, inputChannel.getInputChannelId(), inputChannel.getInitialCredit());
 
 		final ChannelFutureListener listener = new ChannelFutureListener() {
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 4e1eaef..4c156df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -382,6 +382,10 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 		return id;
 	}
 
+	public int getInitialCredit() {
+		return initialCredit;
+	}
+
 	public BufferProvider getBufferProvider() throws IOException {
 		if (isReleased.get()) {
 			return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index 12f5064..912fae2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -97,7 +97,7 @@ public class CancelPartitionRequestTest {
 			Channel ch = connect(serverAndClient);
 
 			// Request for non-existing input channel => results in cancel request
-			ch.writeAndFlush(new PartitionRequest(pid, 0, new InputChannelID())).await();
+			ch.writeAndFlush(new PartitionRequest(pid, 0, new InputChannelID(), 2)).await();
 
 			// Wait for the notification
 			if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
@@ -150,7 +150,7 @@ public class CancelPartitionRequestTest {
 			// Request for non-existing input channel => results in cancel request
 			InputChannelID inputChannelId = new InputChannelID();
 
-			ch.writeAndFlush(new PartitionRequest(pid, 0, inputChannelId)).await();
+			ch.writeAndFlush(new PartitionRequest(pid, 0, inputChannelId, 2)).await();
 
 			// Wait for the notification
 			if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
index 8200caa..0651f97 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
@@ -126,12 +126,13 @@ public class NettyMessageSerializationTest {
 		}
 
 		{
-			NettyMessage.PartitionRequest expected = new NettyMessage.PartitionRequest(new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID()), random.nextInt(), new InputChannelID());
+			NettyMessage.PartitionRequest expected = new NettyMessage.PartitionRequest(new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID()), random.nextInt(), new InputChannelID(), random.nextInt());
 			NettyMessage.PartitionRequest actual = encodeAndDecode(expected);
 
 			assertEquals(expected.partitionId, actual.partitionId);
 			assertEquals(expected.queueIndex, actual.queueIndex);
 			assertEquals(expected.receiverId, actual.receiverId);
+			assertEquals(expected.credit, actual.credit);
 		}
 
 		{

http://git-wip-us.apache.org/repos/asf/flink/blob/891f359d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
index 01a0b5f..d365fba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
@@ -106,7 +106,7 @@ public class ServerTransportErrorHandlingTest {
 			Channel ch = connect(serverAndClient);
 
 			// Write something to trigger close by server
-			ch.writeAndFlush(new NettyMessage.PartitionRequest(new ResultPartitionID(), 0, new InputChannelID()));
+			ch.writeAndFlush(new NettyMessage.PartitionRequest(new ResultPartitionID(), 0, new InputChannelID(), 2));
 
 			// Wait for the notification
 			if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {


[3/6] flink git commit: [FLINK-7808] [REST] JobDetails constructor checks size of tasksPerState argument

Posted by ch...@apache.org.
[FLINK-7808] [REST] JobDetails constructor checks size of tasksPerState argument

This closes #4800.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e2ae45b4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e2ae45b4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e2ae45b4

Branch: refs/heads/master
Commit: e2ae45b48345cf56501530e101f3c8523448ab79
Parents: 891f359
Author: zentol <ch...@apache.org>
Authored: Wed Oct 11 13:47:38 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 11 22:06:58 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/messages/webmonitor/JobDetails.java  | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e2ae45b4/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
index 2aca75b..16dfa51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.messages.webmonitor;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.util.Preconditions;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.JsonParser;
@@ -92,6 +93,8 @@ public class JobDetails implements Serializable {
 		this.duration = duration;
 		this.status = checkNotNull(status);
 		this.lastUpdateTime = lastUpdateTime;
+		Preconditions.checkArgument(tasksPerState.length == ExecutionState.values().length, 
+			"tasksPerState argument must be of size {}.", ExecutionState.values().length);
 		this.tasksPerState = checkNotNull(tasksPerState);
 		this.numTasks = numTasks;
 	}


[6/6] flink git commit: [hotfix] [Javadoc] Fix typo in Javadoc for class FileSystem

Posted by ch...@apache.org.
[hotfix] [Javadoc] Fix typo in Javadoc for class FileSystem

This closes #4791.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b2cac3a2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b2cac3a2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b2cac3a2

Branch: refs/heads/master
Commit: b2cac3a2a3dbc2bf0206f9fcdd8bc7ccc95a9486
Parents: dc3f839
Author: gyao <ga...@data-artisans.com>
Authored: Tue Oct 10 16:27:05 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Oct 11 22:06:59 2017 +0200

----------------------------------------------------------------------
 flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b2cac3a2/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index a6c9b50..643ca9b 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -493,7 +493,7 @@ public abstract class FileSystem {
 	 *
 	 * @param f
 	 *        given path
-	 * @return the statuses of the files/directories in the given patch
+	 * @return the statuses of the files/directories in the given path
 	 * @throws IOException
 	 */
 	public abstract FileStatus[] listStatus(Path f) throws IOException;