You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hs...@apache.org on 2015/03/06 01:31:45 UTC

flink git commit: Some simple cleanups and doc updates while looking at (mostly) runtime package

Repository: flink
Updated Branches:
  refs/heads/master 8f321c729 -> 27d1e7f81


Some simple cleanups and doc updates while looking at (mostly) runtime package

This PR consists of:

1. Remove unnecessary brackets in ExecutionVertex#getPreferredLocations
2. Throw illegal argument with proper message for consistencies of not nullable argument.
3. Add final modifier to RecordWriter#serializers since the content being used as lock.
4. Wrap too long of lines in some of the Java code for readibility.
5. Add missing JavaDoc parameter.
6. Remove final modifier in the OutputEmitter's private methods because it is redundant.

Author: Henry Saputra <he...@gmail.com>

Closes #457 from hsaputra/cleanup_javadoc_and_longlines_1 and squashes the following commits:

8815302 [Henry Saputra] Some cleanups and doc updates while looking at runtime package: 1. Remove unnecessary brackets in ExecutionVertex#getPreferredLocations 2. Throw illegal argument with proper message for consistencies of not nullable argument. 3. Add final modifier to RecordWriter#serializers since the content being used as lock. 4. Wrap too long of lines in some of the Java code for readibility. 5. Add missing JavaDoc parameter. 6. Remove final modifier in the OutputEmitter's private methods because it is redundant.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/27d1e7f8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/27d1e7f8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/27d1e7f8

Branch: refs/heads/master
Commit: 27d1e7f8123fb69367fa3ea668ae6d8af436d8b4
Parents: 8f321c7
Author: Henry Saputra <he...@gmail.com>
Authored: Thu Mar 5 16:31:35 2015 -0800
Committer: Henry Saputra <he...@gmail.com>
Committed: Thu Mar 5 16:31:35 2015 -0800

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java     |  2 +-
 .../runtime/executiongraph/ExecutionVertex.java     |  8 +++-----
 .../runtime/io/network/api/writer/RecordWriter.java |  2 +-
 .../flink/runtime/io/network/buffer/Buffer.java     |  3 ++-
 .../io/network/partition/consumer/InputChannel.java |  3 ++-
 .../network/partition/consumer/SingleInputGate.java | 16 ++++++++++------
 .../iterative/task/AbstractIterativePactTask.java   |  2 +-
 .../flink/runtime/operators/RegularPactTask.java    |  2 +-
 .../runtime/operators/shipping/OutputEmitter.java   | 12 ++++++------
 .../flink/runtime/util/EnvironmentInformation.java  |  1 +
 .../flink/runtime/taskmanager/TaskManager.scala     |  3 +--
 11 files changed, 29 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/27d1e7f8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index bd76089..57ed4c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -215,7 +215,7 @@ public class Execution implements Serializable {
 	 */
 	public boolean scheduleForExecution(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
 		if (scheduler == null) {
-			throw new NullPointerException();
+			throw new IllegalArgumentException("Cannot send null Scheduler when scheduling execution.");
 		}
 
 		final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();

http://git-wip-us.apache.org/repos/asf/flink/blob/27d1e7f8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e83154a..0158fbf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -328,11 +328,9 @@ public class ExecutionVertex implements Serializable {
 	 */
 	public Iterable<Instance> getPreferredLocations() {
 		// if we have hard location constraints, use those
-		{
-			List<Instance> constraintInstances = this.locationConstraintInstances;
-			if (constraintInstances != null && !constraintInstances.isEmpty()) {
-				return constraintInstances;
-			}
+		List<Instance> constraintInstances = this.locationConstraintInstances;
+		if (constraintInstances != null && !constraintInstances.isEmpty()) {
+			return constraintInstances;
 		}
 		
 		// otherwise, base the preferred locations on the input connections

http://git-wip-us.apache.org/repos/asf/flink/blob/27d1e7f8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index ae87aff..b29d4da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -50,7 +50,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 	private final int numChannels;
 
 	/** {@link RecordSerializer} per outgoing channel */
-	private RecordSerializer<T>[] serializers;
+	private final RecordSerializer<T>[] serializers;
 
 	public RecordWriter(BufferWriter writer) {
 		this(writer, new RoundRobinChannelSelector<T>());

http://git-wip-us.apache.org/repos/asf/flink/blob/27d1e7f8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
index 24cd106..0ca9562 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
@@ -94,7 +94,8 @@ public class Buffer {
 		synchronized (recycleLock) {
 			ensureNotRecycled();
 
-			checkArgument(newSize >= 0 && newSize <= memorySegment.size(), "Size of buffer must be >= 0 and <= " + memorySegment.size() + ", but was " + newSize + ".");
+			checkArgument(newSize >= 0 && newSize <= memorySegment.size(), "Size of buffer must be >= 0 and <= " +
+					memorySegment.size() + ", but was " + newSize + ".");
 
 			currentSize = newSize;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/27d1e7f8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 7173566..fb41549 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -46,7 +46,8 @@ public abstract class InputChannel {
 
 	protected final SingleInputGate inputGate;
 
-	protected InputChannel(SingleInputGate inputGate, int channelIndex, ExecutionAttemptID producerExecutionId, IntermediateResultPartitionID partitionId) {
+	protected InputChannel(SingleInputGate inputGate, int channelIndex, ExecutionAttemptID producerExecutionId,
+			IntermediateResultPartitionID partitionId) {
 		this.inputGate = inputGate;
 		this.channelIndex = channelIndex;
 		this.producerExecutionId = producerExecutionId;

http://git-wip-us.apache.org/repos/asf/flink/blob/27d1e7f8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 19898c6..d981451 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -185,8 +185,8 @@ public class SingleInputGate implements InputGate {
 
 	public void setInputChannel(IntermediateResultPartitionID partitionId, InputChannel inputChannel) {
 		synchronized (requestLock) {
-			if (inputChannels.put(checkNotNull(partitionId), checkNotNull(inputChannel)) == null && inputChannel.getClass() == UnknownInputChannel.class) {
-
+			if (inputChannels.put(checkNotNull(partitionId), checkNotNull(inputChannel)) == null &&
+					inputChannel.getClass() == UnknownInputChannel.class) {
 				numberOfUninitializedChannels++;
 			}
 		}
@@ -381,21 +381,25 @@ public class SingleInputGate implements InputGate {
 				case LOCAL:
 					LOG.debug("Create LocalInputChannel for {}.", partition);
 
-					inputChannels[channelIndex] = new LocalInputChannel(reader, channelIndex, producerExecutionId, partitionId, networkEnvironment.getPartitionManager(), networkEnvironment.getTaskEventDispatcher());
+					inputChannels[channelIndex] = new LocalInputChannel(reader, channelIndex, producerExecutionId, partitionId,
+							networkEnvironment.getPartitionManager(), networkEnvironment.getTaskEventDispatcher());
 
 					break;
 				case REMOTE:
 					LOG.debug("Create RemoteInputChannel for {}.", partition);
 
-					final RemoteAddress producerAddress = checkNotNull(partition.getProducerAddress(), "Missing producer address for remote intermediate result partition.");
+					final RemoteAddress producerAddress = checkNotNull(partition.getProducerAddress(),
+							"Missing producer address for remote intermediate result partition.");
 
-					inputChannels[channelIndex] = new RemoteInputChannel(reader, channelIndex, producerExecutionId, partitionId, producerAddress, networkEnvironment.getConnectionManager());
+					inputChannels[channelIndex] = new RemoteInputChannel(reader, channelIndex, producerExecutionId, partitionId,
+							producerAddress, networkEnvironment.getConnectionManager());
 
 					break;
 				case UNKNOWN:
 					LOG.debug("Create UnknownInputChannel for {}.", partition);
 
-					inputChannels[channelIndex] = new UnknownInputChannel(reader, channelIndex, producerExecutionId, partitionId, networkEnvironment.getPartitionManager(), networkEnvironment.getTaskEventDispatcher(), networkEnvironment.getConnectionManager());
+					inputChannels[channelIndex] = new UnknownInputChannel(reader, channelIndex, producerExecutionId, partitionId,
+							networkEnvironment.getPartitionManager(), networkEnvironment.getTaskEventDispatcher(), networkEnvironment.getConnectionManager());
 
 					break;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/27d1e7f8/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
index d213069..9c5fdca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
@@ -54,7 +54,7 @@ import org.apache.flink.util.MutableObjectIterator;
 import java.io.IOException;
 
 /**
- * The base class for all tasks able to participate in an iteration.
+ * The abstract base class for all tasks able to participate in an iteration.
  */
 public abstract class AbstractIterativePactTask<S extends Function, OT> extends RegularPactTask<S, OT>
 		implements Terminable

http://git-wip-us.apache.org/repos/asf/flink/blob/27d1e7f8/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 38b71a1..3deac6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -76,7 +76,7 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * The abstract base class for all tasks. Encapsulated common behavior and implements the main life-cycle
+ * The base class for all tasks. Encapsulated common behavior and implements the main life-cycle
  * of the user code.
  */
 public class RegularPactTask<S extends Function, OT> extends AbstractInvokable implements PactTaskContext<S, OT> {

http://git-wip-us.apache.org/repos/asf/flink/blob/27d1e7f8/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
index ff1a148..30fec48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
@@ -143,7 +143,7 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T
 	
 	// --------------------------------------------------------------------------------------------
 
-	private final int[] robin(int numberOfChannels) {
+	private int[] robin(int numberOfChannels) {
 		if (this.channels == null || this.channels.length != 1) {
 			this.channels = new int[1];
 		}
@@ -156,7 +156,7 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T
 		return this.channels;
 	}
 
-	private final int[] broadcast(int numberOfChannels) {
+	private int[] broadcast(int numberOfChannels) {
 		if (channels == null || channels.length != numberOfChannels) {
 			channels = new int[numberOfChannels];
 			for (int i = 0; i < numberOfChannels; i++) {
@@ -167,7 +167,7 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T
 		return channels;
 	}
 
-	private final int[] hashPartitionDefault(T record, int numberOfChannels) {
+	private int[] hashPartitionDefault(T record, int numberOfChannels) {
 		if (channels == null || channels.length != 1) {
 			channels = new int[1];
 		}
@@ -189,7 +189,7 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T
 		return this.channels;
 	}
 
-	private final int murmurHash(int k) {
+	private int murmurHash(int k) {
 		k *= 0xcc9e2d51;
 		k = Integer.rotateLeft(k, 15);
 		k *= 0x1b873593;
@@ -207,11 +207,11 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T
 		return k;
 	}
 
-	private final int[] rangePartition(T record, int numberOfChannels) {
+	private int[] rangePartition(T record, int numberOfChannels) {
 		throw new UnsupportedOperationException();
 	}
 	
-	private final int[] customPartition(T record, int numberOfChannels) {
+	private int[] customPartition(T record, int numberOfChannels) {
 		if (channels == null) {
 			channels = new int[1];
 			extractedKeys = new Object[1];

http://git-wip-us.apache.org/repos/asf/flink/blob/27d1e7f8/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
index 793e158..4efdf11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
@@ -207,6 +207,7 @@ public class EnvironmentInformation {
 	 *
 	 * @param log The logger to log the information to.
 	 * @param componentName The component name to mention in the log.
+	 * @param commandLineArgs The arguments accompanying the starting the component.
 	 */
 	public static void logEnvironmentInfo(Logger log, String componentName, String[] commandLineArgs) {
 		if (log.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/27d1e7f8/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 268f6e3..a48fc8d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -613,8 +613,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
     }
 
     try {
-      networkEnvironment = Some(new NetworkEnvironment(self, jobManager, timeout,
-        networkConfig))
+      networkEnvironment = Some(new NetworkEnvironment(self, jobManager, timeout, networkConfig))
     } catch {
       case ioe: IOException =>
         log.error(ioe, "Failed to instantiate network environment.")