You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/01 09:44:50 UTC

[5/7] flink git commit: [FLINK-5196] [logging] Don't log InputChannelDeploymentDescriptor

[FLINK-5196] [logging] Don't log InputChannelDeploymentDescriptor


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/67bd8277
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/67bd8277
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/67bd8277

Branch: refs/heads/master
Commit: 67bd8277d1dc1179c30d2dbad0922122ed6f49ee
Parents: dc5650a
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 16:04:48 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 10:44:23 2016 +0100

----------------------------------------------------------------------
 .../InputChannelDeploymentDescriptor.java         |  3 ---
 .../partition/consumer/SingleInputGate.java       | 18 +++++++++++++++---
 2 files changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/67bd8277/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index a72b92f..9b3ce5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -35,7 +35,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
-import java.util.Arrays;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -143,8 +142,6 @@ public class InputChannelDeploymentDescriptor implements Serializable {
 					consumedPartitionId, partitionLocation);
 		}
 
-		LOG.debug("Created {} from edges {}.", Arrays.toString(icdd), Arrays.toString(edges));
-
 		return icdd;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/67bd8277/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 8f57542..d7ed33c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
@@ -554,8 +553,11 @@ public class SingleInputGate implements InputGate {
 		// Create the input channels. There is one input channel for each consumed partition.
 		final InputChannel[] inputChannels = new InputChannel[icdd.length];
 
-		for (int i = 0; i < inputChannels.length; i++) {
+		int numLocalChannels = 0;
+		int numRemoteChannels = 0;
+		int numUnknownChannels = 0;
 
+		for (int i = 0; i < inputChannels.length; i++) {
 			final ResultPartitionID partitionId = icdd[i].getConsumedPartitionId();
 			final ResultPartitionLocation partitionLocation = icdd[i].getConsumedPartitionLocation();
 
@@ -567,6 +569,8 @@ public class SingleInputGate implements InputGate {
 					networkEnvironment.getPartitionRequestMaxBackoff(),
 					metrics
 				);
+
+				numLocalChannels++;
 			}
 			else if (partitionLocation.isRemote()) {
 				inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId,
@@ -576,6 +580,8 @@ public class SingleInputGate implements InputGate {
 					networkEnvironment.getPartitionRequestMaxBackoff(),
 					metrics
 				);
+
+				numRemoteChannels++;
 			}
 			else if (partitionLocation.isUnknown()) {
 				inputChannels[i] = new UnknownInputChannel(inputGate, i, partitionId,
@@ -586,6 +592,8 @@ public class SingleInputGate implements InputGate {
 					networkEnvironment.getPartitionRequestMaxBackoff(),
 					metrics
 				);
+
+				numUnknownChannels++;
 			}
 			else {
 				throw new IllegalStateException("Unexpected partition location.");
@@ -594,7 +602,11 @@ public class SingleInputGate implements InputGate {
 			inputGate.setInputChannel(partitionId.getPartitionId(), inputChannels[i]);
 		}
 
-		LOG.debug("Created input channels {} from {}.", Arrays.toString(inputChannels), igdd);
+		LOG.debug("Created {} input channels (local: {}, remote: {}, unknown: {}).",
+			inputChannels.length,
+			numLocalChannels,
+			numRemoteChannels,
+			numUnknownChannels);
 
 		return inputGate;
 	}