You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/01 09:44:50 UTC
[5/7] flink git commit: [FLINK-5196] [logging] Don't log
InputChannelDeploymentDescriptor
[FLINK-5196] [logging] Don't log InputChannelDeploymentDescriptor
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/67bd8277
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/67bd8277
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/67bd8277
Branch: refs/heads/master
Commit: 67bd8277d1dc1179c30d2dbad0922122ed6f49ee
Parents: dc5650a
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 29 16:04:48 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 10:44:23 2016 +0100
----------------------------------------------------------------------
.../InputChannelDeploymentDescriptor.java | 3 ---
.../partition/consumer/SingleInputGate.java | 18 +++++++++++++++---
2 files changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/67bd8277/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index a72b92f..9b3ce5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -35,7 +35,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
-import java.util.Arrays;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -143,8 +142,6 @@ public class InputChannelDeploymentDescriptor implements Serializable {
consumedPartitionId, partitionLocation);
}
- LOG.debug("Created {} from edges {}.", Arrays.toString(icdd), Arrays.toString(edges));
-
return icdd;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/67bd8277/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 8f57542..d7ed33c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import java.util.Map;
@@ -554,8 +553,11 @@ public class SingleInputGate implements InputGate {
// Create the input channels. There is one input channel for each consumed partition.
final InputChannel[] inputChannels = new InputChannel[icdd.length];
- for (int i = 0; i < inputChannels.length; i++) {
+ int numLocalChannels = 0;
+ int numRemoteChannels = 0;
+ int numUnknownChannels = 0;
+ for (int i = 0; i < inputChannels.length; i++) {
final ResultPartitionID partitionId = icdd[i].getConsumedPartitionId();
final ResultPartitionLocation partitionLocation = icdd[i].getConsumedPartitionLocation();
@@ -567,6 +569,8 @@ public class SingleInputGate implements InputGate {
networkEnvironment.getPartitionRequestMaxBackoff(),
metrics
);
+
+ numLocalChannels++;
}
else if (partitionLocation.isRemote()) {
inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId,
@@ -576,6 +580,8 @@ public class SingleInputGate implements InputGate {
networkEnvironment.getPartitionRequestMaxBackoff(),
metrics
);
+
+ numRemoteChannels++;
}
else if (partitionLocation.isUnknown()) {
inputChannels[i] = new UnknownInputChannel(inputGate, i, partitionId,
@@ -586,6 +592,8 @@ public class SingleInputGate implements InputGate {
networkEnvironment.getPartitionRequestMaxBackoff(),
metrics
);
+
+ numUnknownChannels++;
}
else {
throw new IllegalStateException("Unexpected partition location.");
@@ -594,7 +602,11 @@ public class SingleInputGate implements InputGate {
inputGate.setInputChannel(partitionId.getPartitionId(), inputChannels[i]);
}
- LOG.debug("Created input channels {} from {}.", Arrays.toString(inputChannels), igdd);
+ LOG.debug("Created {} input channels (local: {}, remote: {}, unknown: {}).",
+ inputChannels.length,
+ numLocalChannels,
+ numRemoteChannels,
+ numUnknownChannels);
return inputGate;
}