You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/04/27 14:49:51 UTC

[1/4] flink git commit: [hotfix][metrics] keep the non-null assumption and implement tests properly

Repository: flink
Updated Branches:
  refs/heads/master db759c530 -> fff04bfbe


[hotfix][metrics] keep the non-null assumption and implement tests properly

Previous commit in e908b62ab3 caused a regression leading to unit test failures.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4795ce8f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4795ce8f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4795ce8f

Branch: refs/heads/master
Commit: 4795ce8f501f3c16e363664e8d9927e4a0aaa6ce
Parents: 931ce05
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Feb 27 17:30:59 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Apr 27 15:14:09 2017 +0200

----------------------------------------------------------------------
 .../runtime/taskexecutor/TaskExecutorTest.java     | 17 +++++++++++++++--
 .../runtime/taskmanager/TaskAsyncCallTest.java     |  6 +++++-
 .../flink/runtime/taskmanager/TaskStopTest.java    |  6 +++++-
 .../apache/flink/runtime/taskmanager/TaskTest.java |  7 +++++--
 4 files changed, 30 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4795ce8f/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index bc6fe68..1d1840e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -59,6 +59,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -547,10 +548,13 @@ public class TaskExecutorTest extends TestLogger {
 
 		final TaskManagerMetricGroup taskManagerMetricGroup = mock(TaskManagerMetricGroup.class);
 
+		TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class);
+		when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class));
+
 		when(taskManagerMetricGroup.addTaskForJob(
 				any(JobID.class), anyString(), any(JobVertexID.class), any(ExecutionAttemptID.class),
 				anyString(), anyInt(), anyInt())
-			).thenReturn(mock(TaskMetricGroup.class));
+			).thenReturn(taskMetricGroup);
 
 		final HighAvailabilityServices haServices = mock(HighAvailabilityServices.class);
 		when(haServices.getResourceManagerLeaderRetriever()).thenReturn(mock(LeaderRetrievalService.class));
@@ -1010,6 +1014,15 @@ public class TaskExecutorTest extends TestLogger {
 		jobManagerTable.put(jobId, jobManagerConnection);
 
 		try {
+			final TaskManagerMetricGroup taskManagerMetricGroup = mock(TaskManagerMetricGroup.class);
+			TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class);
+			when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class));
+
+			when(taskManagerMetricGroup.addTaskForJob(
+				any(JobID.class), anyString(), any(JobVertexID.class), any(ExecutionAttemptID.class),
+				anyString(), anyInt(), anyInt())
+			).thenReturn(taskMetricGroup);
+
 			final TaskExecutor taskManager = new TaskExecutor(
 				rpc,
 				taskManagerConfiguration,
@@ -1020,7 +1033,7 @@ public class TaskExecutorTest extends TestLogger {
 				haServices,
 				mock(HeartbeatServices.class, RETURNS_MOCKS),
 				mock(MetricRegistry.class),
-				mock(TaskManagerMetricGroup.class),
+				taskManagerMetricGroup,
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
 				taskSlotTable,

http://git-wip-us.apache.org/repos/asf/flink/blob/4795ce8f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 89ae5da..c6d2fec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateHandles;
@@ -157,6 +158,9 @@ public class TaskAsyncCallTest {
 		when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
 				.thenReturn(mock(TaskKvStateRegistry.class));
 
+		TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class);
+		when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class));
+
 		JobInformation jobInformation = new JobInformation(
 			new JobID(),
 			"Job Name",
@@ -194,7 +198,7 @@ public class TaskAsyncCallTest {
 			libCache,
 			mock(FileCache.class),
 			new TestingTaskManagerRuntimeInfo(),
-			mock(TaskMetricGroup.class),
+			taskMetricGroup,
 			consumableNotifier,
 			partitionProducerStateChecker,
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/4795ce8f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index f3ac809..40678de 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -68,6 +69,9 @@ public class TaskStopTest {
 		TaskManagerRuntimeInfo tmRuntimeInfo = mock(TaskManagerRuntimeInfo.class);
 		when(tmRuntimeInfo.getConfiguration()).thenReturn(new Configuration());
 
+		TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class);
+		when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class));
+
 		task = new Task(
 			mock(JobInformation.class),
 			new TaskInformation(
@@ -95,7 +99,7 @@ public class TaskStopTest {
 			mock(LibraryCacheManager.class),
 			mock(FileCache.class),
 			tmRuntimeInfo,
-			mock(TaskMetricGroup.class),
+			taskMetricGroup,
 			mock(ResultPartitionConsumableNotifier.class),
 			mock(PartitionProducerStateChecker.class),
 			mock(Executor.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/4795ce8f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index eb59b1b..2522287 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -53,9 +53,9 @@ import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
@@ -941,6 +941,9 @@ public class TaskTest extends TestLogger {
 			1,
 			invokable.getName(),
 			new Configuration());
+
+		TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class);
+		when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class));
 		
 		return new Task(
 			jobInformation,
@@ -963,7 +966,7 @@ public class TaskTest extends TestLogger {
 			libCache,
 			mock(FileCache.class),
 			new TestingTaskManagerRuntimeInfo(taskManagerConfig),
-			mock(TaskMetricGroup.class),
+			taskMetricGroup,
 			consumableNotifier,
 			partitionProducerStateChecker,
 			executor);


[3/4] flink git commit: [FLINK-5090] [network] Add metrics for details about inbound/outbound network queues

Posted by al...@apache.org.
[FLINK-5090] [network] Add metrics for details about inbound/outbound network queues

These metrics are optimised go go through the channels only once in order to
gather all metrics, i.e. min, max, avg and sum. Whenever a request to either
of those is made, all metrics are refreshed and cached. Requests to the other
metrics will be served from the cache. However, each value will be served only
once from the cache and a second call to retrieve the minimum, for example,
will refresh the cache for all values.

This setup may at first be a bit strange but ensures that the statistics belong
together logically and originate from a common point in time. This is not
necessarily the point in time the metric was requested though.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6cba8bb3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6cba8bb3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6cba8bb3

Branch: refs/heads/master
Commit: 6cba8bb32a822a3f6bc7a798224390d24dd19cfc
Parents: db759c5
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 17 19:36:56 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Apr 27 15:14:09 2017 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |   6 +
 .../partition/PipelinedSubpartition.java        |   5 +-
 .../io/network/partition/ResultPartition.java   |   7 +-
 .../partition/ResultPartitionMetrics.java       | 136 +++++++++++++++
 .../network/partition/ResultSubpartition.java   |   7 +-
 .../partition/SpillableSubpartition.java        |   5 +-
 .../partition/consumer/InputGateMetrics.java    | 167 +++++++++++++++++++
 .../partition/consumer/RemoteInputChannel.java  |   7 +-
 .../partition/consumer/SingleInputGate.java     |   6 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  17 ++
 10 files changed, 351 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6cba8bb3/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 61c1b27..6863d14 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -251,6 +251,12 @@ public final class ConfigConstants {
 	public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers";
 
 	/**
+	 * Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths
+	 */
+	@PublicEvolving
+	public static final String NETWORK_DETAILED_METRICS_KEY = "taskmanager.net.detailed-metrics";
+
+	/**
 	 * Config parameter defining the size of memory buffers used by the network stack and the memory manager.
 	 *
 	 * @deprecated Use {@link TaskManagerOptions#MEMORY_SEGMENT_SIZE} instead

http://git-wip-us.apache.org/repos/asf/flink/blob/6cba8bb3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 9e2f5ba..4fd74e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -211,7 +211,8 @@ class PipelinedSubpartition extends ResultSubpartition {
 	}
 
 	@Override
-	public int getNumberOfQueuedBuffers() {
-			return buffers.size();
+	public int unsynchronizedGetNumberOfQueuedBuffers() {
+		// since we do not synchronize, the size may actually be lower than 0!
+		return Math.max(buffers.size(), 0);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cba8bb3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index eb1418b..29a7caf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.apache.flink.runtime.taskmanager.TaskManager;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -232,7 +233,7 @@ public class ResultPartition implements BufferPoolOwner {
 		int totalBuffers = 0;
 
 		for (ResultSubpartition subpartition : subpartitions) {
-			totalBuffers += subpartition.getNumberOfQueuedBuffers();
+			totalBuffers += subpartition.unsynchronizedGetNumberOfQueuedBuffers();
 		}
 
 		return totalBuffers;
@@ -446,6 +447,10 @@ public class ResultPartition implements BufferPoolOwner {
 				this, subpartitionIndex, pendingReferences);
 	}
 
+	ResultSubpartition[] getAllPartitions() {
+		return subpartitions;
+	}
+
 	// ------------------------------------------------------------------------
 
 	private void checkInProduceState() {

http://git-wip-us.apache.org/repos/asf/flink/blob/6cba8bb3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionMetrics.java
new file mode 100644
index 0000000..0fd3210
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionMetrics.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class ResultPartitionMetrics {
+
+	private final ResultPartition partition;
+
+	private int lastMin = -1;
+
+	private int lastMax = -1;
+
+	private float lastAvg = -1.0f;
+
+	// ------------------------------------------------------------------------
+
+	private ResultPartitionMetrics(ResultPartition partition) {
+		this.partition = checkNotNull(partition);
+	}
+
+	// ------------------------------------------------------------------------
+
+	// these methods are package private to make access from the nested classes faster 
+
+	int refreshAndGetMin() {
+		int min;
+		if ((min = lastMin) == -1) {
+			refresh();
+			min = lastMin;
+		}
+
+		lastMin = -1;
+		return min;
+	}
+
+	int refreshAndGetMax() {
+		int max;
+		if ((max = lastMax) == -1) {
+			refresh();
+			max = lastMax;
+		}
+
+		lastMax = -1;
+		return max;
+	}
+
+	float refreshAndGetAvg() {
+		float avg;
+		if ((avg = lastAvg) < 0.0f) {
+			refresh();
+			avg = lastAvg;
+		}
+
+		lastAvg = -1.0f;
+		return avg;
+	}
+
+	private void refresh() {
+		int min = Integer.MAX_VALUE;
+		int max = 0;
+
+		for (ResultSubpartition part : partition.getAllPartitions()) {
+			int size = part.unsynchronizedGetNumberOfQueuedBuffers();
+			min = Math.min(min, size);
+			max = Math.max(max, size);
+		}
+
+		this.lastMin = min;
+		this.lastMax = max;
+		this.lastAvg = partition.getTotalNumberOfBuffers() / (float) partition.getNumberOfSubpartitions();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Gauges to access the stats
+	// ------------------------------------------------------------------------
+
+	private Gauge<Integer> getMinQueueLenGauge() {
+		return new Gauge<Integer>() {
+			@Override
+			public Integer getValue() {
+				return refreshAndGetMin();
+			}
+		};
+	}
+
+	private Gauge<Integer> getMaxQueueLenGauge() {
+		return new Gauge<Integer>() {
+			@Override
+			public Integer getValue() {
+				return refreshAndGetMax();
+			}
+		};
+	}
+
+	private Gauge<Float> getAvgQueueLenGauge() {
+		return new Gauge<Float>() {
+			@Override
+			public Float getValue() {
+				return refreshAndGetAvg();
+			}
+		};
+	}
+
+	// ------------------------------------------------------------------------
+	//  Static access
+	// ------------------------------------------------------------------------
+
+	public static void registerQueueLengthMetrics(MetricGroup group, ResultPartition partition) {
+		ResultPartitionMetrics metrics = new ResultPartitionMetrics(partition);
+
+		group.gauge("min-queue-len", metrics.getMinQueueLenGauge());
+		group.gauge("max-queue-len", metrics.getMaxQueueLenGauge());
+		group.gauge("avg-queue-len", metrics.getAvgQueueLenGauge());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6cba8bb3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index dd0e152..d3cd887 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -83,6 +83,11 @@ public abstract class ResultSubpartition {
 
 	abstract public boolean isReleased();
 
-	abstract public int getNumberOfQueuedBuffers();
+	/**
+	 * Makes a best effort to get the current size of the queue.
+	 * This method must not acquire locks or interfere with the task and network threads in
+	 * any way.
+	 */
+	abstract public int unsynchronizedGetNumberOfQueuedBuffers();
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cba8bb3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index ae97c42..a578188 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -237,8 +237,9 @@ class SpillableSubpartition extends ResultSubpartition {
 	}
 
 	@Override
-	public int getNumberOfQueuedBuffers() {
-		return buffers.size();
+	public int unsynchronizedGetNumberOfQueuedBuffers() {
+		// since we do not synchronize, the size may actually be lower than 0!
+		return Math.max(buffers.size(), 0);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6cba8bb3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
new file mode 100644
index 0000000..b5eb860
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class InputGateMetrics {
+
+	private final SingleInputGate inputGate;
+
+	private long lastTotal = -1;
+
+	private int lastMin = -1;
+
+	private int lastMax = -1;
+
+	private float lastAvg = -1.0f;
+
+	// ------------------------------------------------------------------------
+
+	private InputGateMetrics(SingleInputGate inputGate) {
+		this.inputGate = checkNotNull(inputGate);
+	}
+
+	// ------------------------------------------------------------------------
+
+	// these methods are package private to make access from the nested classes faster 
+
+	long refreshAndGetTotal() {
+		long total;
+		if ((total = lastTotal) == -1) {
+			refresh();
+			total = lastTotal;
+		}
+
+		lastTotal = -1;
+		return total;
+	}
+
+	int refreshAndGetMin() {
+		int min;
+		if ((min = lastMin) == -1) {
+			refresh();
+			min = lastMin;
+		}
+
+		lastMin = -1;
+		return min;
+	}
+
+	int refreshAndGetMax() {
+		int max;
+		if ((max = lastMax) == -1) {
+			refresh();
+			max = lastMax;
+		}
+
+		lastMax = -1;
+		return max;
+	}
+
+	float refreshAndGetAvg() {
+		float avg;
+		if ((avg = lastAvg) < 0.0f) {
+			refresh();
+			avg = lastAvg;
+		}
+
+		lastAvg = -1.0f;
+		return avg;
+	}
+
+	private void refresh() {
+		long total = 0;
+		int min = Integer.MAX_VALUE;
+		int max = 0;
+		int count = 0;
+
+		for (InputChannel channel : inputGate.getInputChannels().values()) {
+			if (channel.getClass() == RemoteInputChannel.class) {
+				RemoteInputChannel rc = (RemoteInputChannel) channel;
+
+				int size = rc.unsynchronizedGetNumberOfQueuedBuffers();
+				total += size;
+				min = Math.min(min, size);
+				max = Math.max(max, size);
+				count++;
+			}
+		}
+
+		this.lastMin = min;
+		this.lastMax = max;
+		this.lastAvg = total / (float) count;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Gauges to access the stats
+	// ------------------------------------------------------------------------
+
+	private Gauge<Long> getTotalQueueLenGauge() {
+		return new Gauge<Long>() {
+			@Override
+			public Long getValue() {
+				return refreshAndGetTotal();
+			}
+		};
+	}
+
+	private Gauge<Integer> getMinQueueLenGauge() {
+		return new Gauge<Integer>() {
+			@Override
+			public Integer getValue() {
+				return refreshAndGetMin();
+			}
+		};
+	}
+
+	private Gauge<Integer> getMaxQueueLenGauge() {
+		return new Gauge<Integer>() {
+			@Override
+			public Integer getValue() {
+				return refreshAndGetMax();
+			}
+		};
+	}
+
+	private Gauge<Float> getAvgQueueLenGauge() {
+		return new Gauge<Float>() {
+			@Override
+			public Float getValue() {
+				return refreshAndGetAvg();
+			}
+		};
+	}
+
+	// ------------------------------------------------------------------------
+	//  Static access
+	// ------------------------------------------------------------------------
+
+	public static void registerQueueLengthMetrics(MetricGroup group, SingleInputGate gate) {
+		InputGateMetrics metrics = new InputGateMetrics(gate);
+
+		group.gauge("total-queue-len", metrics.getTotalQueueLenGauge());
+		group.gauge("min-queue-len", metrics.getMinQueueLenGauge());
+		group.gauge("max-queue-len", metrics.getMaxQueueLenGauge());
+		group.gauge("avg-queue-len", metrics.getAvgQueueLenGauge());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6cba8bb3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index ed3122e..719f340 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
-import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -54,7 +53,7 @@ public class RemoteInputChannel extends InputChannel {
 	 * The received buffers. Received buffers are enqueued by the network I/O thread and the queue
 	 * is consumed by the receiving task thread.
 	 */
-	private final Queue<Buffer> receivedBuffers = new ArrayDeque<>();
+	private final ArrayDeque<Buffer> receivedBuffers = new ArrayDeque<>();
 
 	/**
 	 * Flag indicating whether this channel has been released. Either called by the receiving task
@@ -219,6 +218,10 @@ public class RemoteInputChannel extends InputChannel {
 		}
 	}
 
+	public int unsynchronizedGetNumberOfQueuedBuffers() {
+		return Math.max(0, receivedBuffers.size());
+	}
+
 	public InputChannelID getInputChannelId() {
 		return id;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6cba8bb3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index afe8722..ebfb300 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import com.google.common.collect.Maps;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -48,6 +46,7 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Timer;
@@ -188,7 +187,7 @@ public class SingleInputGate implements InputGate {
 		checkArgument(numberOfInputChannels > 0);
 		this.numberOfInputChannels = numberOfInputChannels;
 
-		this.inputChannels = Maps.newHashMapWithExpectedSize(numberOfInputChannels);
+		this.inputChannels = new HashMap<>(numberOfInputChannels);
 		this.channelsWithEndOfPartitionEvents = new BitSet(numberOfInputChannels);
 
 		this.taskActions = checkNotNull(taskActions);
@@ -566,7 +565,6 @@ public class SingleInputGate implements InputGate {
 
 	// ------------------------------------------------------------------------
 
-	@VisibleForTesting
 	Map<IntermediateResultPartitionID, InputChannel> getInputChannels() {
 		return inputChannels;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6cba8bb3/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index ef934de..f3dacc3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FileSystemSafetyNet;
@@ -53,6 +54,8 @@ import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionMetrics;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGateMetrics;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -386,6 +389,20 @@ public class Task implements Runnable, TaskActions {
 			++counter;
 		}
 
+		// register detailed network metrics, if configured
+		if (tmConfig.getBoolean(ConfigConstants.NETWORK_DETAILED_METRICS_KEY, false)) {
+			// output metrics
+			for (int i = 0; i < producedPartitions.length; i++) {
+				ResultPartitionMetrics.registerQueueLengthMetrics(
+						metricGroup.addGroup("netout_" + i), producedPartitions[i]);
+			}
+
+			for (int i = 0; i < inputGates.length; i++) {
+				InputGateMetrics.registerQueueLengthMetrics(
+						metricGroup.addGroup("netin_" + i), inputGates[i]);
+			}
+		}
+
 		invokableHasBeenCanceled = new AtomicBoolean(false);
 
 		// finally, create the executing thread, but do not start it


[4/4] flink git commit: [FLINK-5090] Fixups for "Add metrics for details about inbound/outbound network queues"

Posted by al...@apache.org.
[FLINK-5090] Fixups for "Add metrics for details about inbound/outbound network queues"

This closes #3348.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fff04bfb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fff04bfb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fff04bfb

Branch: refs/heads/master
Commit: fff04bfbea5eeb3fc0d6a59db961c4b678ceab58
Parents: 4795ce8
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Feb 21 14:35:05 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Apr 27 16:12:28 2017 +0200

----------------------------------------------------------------------
 docs/monitoring/metrics.md                      |  20 +++-
 .../flink/configuration/ConfigConstants.java    |   6 -
 .../flink/configuration/TaskManagerOptions.java |   8 ++
 .../io/network/partition/ResultPartition.java   |  10 ++
 .../partition/ResultPartitionMetrics.java       | 111 +++++++++++-------
 .../partition/consumer/InputGateMetrics.java    | 115 +++++++++++--------
 .../apache/flink/runtime/taskmanager/Task.java  |  41 ++++---
 7 files changed, 200 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fff04bfb/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index e51f82e..290a452 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -640,7 +640,7 @@ Thus, in order to infer the metric identifier:
       <td>The number of allocated memory segments.</td>
     </tr>
     <tr>
-      <th rowspan="4">Task</th>
+      <th rowspan="8">Task</th>
       <td rowspan="4">buffers</td>
       <td>inputQueueLength</td>
       <td>The number of queued input buffers.</td>
@@ -657,6 +657,24 @@ Thus, in order to infer the metric identifier:
       <td>outPoolUsage</td>
       <td>An estimate of the output buffers usage.</td>
     </tr>
+    <tr>
+      <td rowspan="4">Network.&lt;Input|Output&gt;.&lt;gate&gt;<br />
+        <strong>(only available if <tt>taskmanager.net.detailed-metrics</tt> config option is set)</strong></td>
+      <td>totalQueueLen</td>
+      <td>Total number of queued buffers in all input/output channels.</td>
+    </tr>
+    <tr>
+      <td>minQueueLen</td>
+      <td>Minimum number of queued buffers in all input/output channels.</td>
+    </tr>
+    <tr>
+      <td>maxQueueLen</td>
+      <td>Maximum number of queued buffers in all input/output channels.</td>
+    </tr>
+    <tr>
+      <td>avgQueueLen</td>
+      <td>Average number of queued buffers in all input/output channels.</td>
+    </tr>
   </tbody>
 </table>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fff04bfb/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 6863d14..61c1b27 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -251,12 +251,6 @@ public final class ConfigConstants {
 	public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers";
 
 	/**
-	 * Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths
-	 */
-	@PublicEvolving
-	public static final String NETWORK_DETAILED_METRICS_KEY = "taskmanager.net.detailed-metrics";
-
-	/**
 	 * Config parameter defining the size of memory buffers used by the network stack and the memory manager.
 	 *
 	 * @deprecated Use {@link TaskManagerOptions#MEMORY_SEGMENT_SIZE} instead

http://git-wip-us.apache.org/repos/asf/flink/blob/fff04bfb/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 9a00a0f..777ee21 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -115,6 +115,14 @@ public class TaskManagerOptions {
 		key("taskmanager.net.memory.extra-buffers-per-gate")
 			.defaultValue(8);
 
+	/**
+	 * Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue
+	 * lengths.
+	 */
+	public static final ConfigOption<Boolean> NETWORK_DETAILED_METRICS =
+			key("taskmanager.net.detailed-metrics")
+			.defaultValue(false);
+
 	// ------------------------------------------------------------------------
 	//  Task Options
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/fff04bfb/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 29a7caf..d207f60 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -221,10 +221,20 @@ public class ResultPartition implements BufferPoolOwner {
 		return bufferPool;
 	}
 
+	/**
+	 * Returns the total number of processed network buffers since initialization.
+	 *
+	 * @return overall number of processed network buffers
+	 */
 	public int getTotalNumberOfBuffers() {
 		return totalNumberOfBuffers;
 	}
 
+	/**
+	 * Returns the total size of processed network buffers since initialization.
+	 *
+	 * @return overall size of processed network buffers
+	 */
 	public long getTotalNumberOfBytes() {
 		return totalNumberOfBytes;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fff04bfb/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionMetrics.java
index 0fd3210..fde2ebd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionMetrics.java
@@ -23,16 +23,13 @@ import org.apache.flink.metrics.MetricGroup;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
+/**
+ * Collects metrics of a result partition.
+ */
 public class ResultPartitionMetrics {
 
 	private final ResultPartition partition;
 
-	private int lastMin = -1;
-
-	private int lastMax = -1;
-
-	private float lastAvg = -1.0f;
-
 	// ------------------------------------------------------------------------
 
 	private ResultPartitionMetrics(ResultPartition partition) {
@@ -41,60 +38,95 @@ public class ResultPartitionMetrics {
 
 	// ------------------------------------------------------------------------
 
-	// these methods are package private to make access from the nested classes faster 
+	// these methods are package private to make access from the nested classes faster
 
-	int refreshAndGetMin() {
-		int min;
-		if ((min = lastMin) == -1) {
-			refresh();
-			min = lastMin;
+	/**
+	 * Iterates over all sub-partitions and collects the total number of queued buffers in a
+	 * best-effort way.
+	 *
+	 * @return total number of queued buffers
+	 */
+	long refreshAndGetTotal() {
+		long total = 0;
+
+		for (ResultSubpartition part : partition.getAllPartitions()) {
+			total += part.unsynchronizedGetNumberOfQueuedBuffers();
 		}
 
-		lastMin = -1;
-		return min;
+		return total;
 	}
 
-	int refreshAndGetMax() {
-		int max;
-		if ((max = lastMax) == -1) {
-			refresh();
-			max = lastMax;
-		}
+	/**
+	 * Iterates over all sub-partitions and collects the minimum number of queued buffers in a
+	 * sub-partition in a best-effort way.
+	 *
+	 * @return minimum number of queued buffers per sub-partition (<tt>0</tt> if sub-partitions exist)
+	 */
+	int refreshAndGetMin() {
+		int min = Integer.MAX_VALUE;
 
-		lastMax = -1;
-		return max;
-	}
+		ResultSubpartition[] allPartitions = partition.getAllPartitions();
+		if (allPartitions.length == 0) {
+			// meaningful value when no channels exist:
+			return 0;
+		}
 
-	float refreshAndGetAvg() {
-		float avg;
-		if ((avg = lastAvg) < 0.0f) {
-			refresh();
-			avg = lastAvg;
+		for (ResultSubpartition part : allPartitions) {
+			int size = part.unsynchronizedGetNumberOfQueuedBuffers();
+			min = Math.min(min, size);
 		}
 
-		lastAvg = -1.0f;
-		return avg;
+		return min;
 	}
 
-	private void refresh() {
-		int min = Integer.MAX_VALUE;
+	/**
+	 * Iterates over all sub-partitions and collects the maximum number of queued buffers in a
+	 * sub-partition in a best-effort way.
+	 *
+	 * @return maximum number of queued buffers per sub-partition
+	 */
+	int refreshAndGetMax() {
 		int max = 0;
 
 		for (ResultSubpartition part : partition.getAllPartitions()) {
 			int size = part.unsynchronizedGetNumberOfQueuedBuffers();
-			min = Math.min(min, size);
 			max = Math.max(max, size);
 		}
 
-		this.lastMin = min;
-		this.lastMax = max;
-		this.lastAvg = partition.getTotalNumberOfBuffers() / (float) partition.getNumberOfSubpartitions();
+		return max;
+	}
+
+	/**
+	 * Iterates over all sub-partitions and collects the average number of queued buffers in a
+	 * sub-partition in a best-effort way.
+	 *
+	 * @return average number of queued buffers per sub-partition
+	 */
+	float refreshAndGetAvg() {
+		long total = 0;
+
+		ResultSubpartition[] allPartitions = partition.getAllPartitions();
+		for (ResultSubpartition part : allPartitions) {
+			int size = part.unsynchronizedGetNumberOfQueuedBuffers();
+			total += size;
+		}
+
+		return total / (float) allPartitions.length;
 	}
 
 	// ------------------------------------------------------------------------
 	//  Gauges to access the stats
 	// ------------------------------------------------------------------------
 
+	private Gauge<Long> getTotalQueueLenGauge() {
+		return new Gauge<Long>() {
+			@Override
+			public Long getValue() {
+				return refreshAndGetTotal();
+			}
+		};
+	}
+
 	private Gauge<Integer> getMinQueueLenGauge() {
 		return new Gauge<Integer>() {
 			@Override
@@ -129,8 +161,9 @@ public class ResultPartitionMetrics {
 	public static void registerQueueLengthMetrics(MetricGroup group, ResultPartition partition) {
 		ResultPartitionMetrics metrics = new ResultPartitionMetrics(partition);
 
-		group.gauge("min-queue-len", metrics.getMinQueueLenGauge());
-		group.gauge("max-queue-len", metrics.getMaxQueueLenGauge());
-		group.gauge("avg-queue-len", metrics.getAvgQueueLenGauge());
+		group.gauge("totalQueueLen", metrics.getTotalQueueLenGauge());
+		group.gauge("minQueueLen", metrics.getMinQueueLenGauge());
+		group.gauge("maxQueueLen", metrics.getMaxQueueLenGauge());
+		group.gauge("avgQueueLen", metrics.getAvgQueueLenGauge());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fff04bfb/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
index b5eb860..c9fed9b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
@@ -21,20 +21,17 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 
+import java.util.Collection;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
+/**
+ * Collects metrics of an input gate.
+ */
 public class InputGateMetrics {
 
 	private final SingleInputGate inputGate;
 
-	private long lastTotal = -1;
-
-	private int lastMin = -1;
-
-	private int lastMax = -1;
-
-	private float lastAvg = -1.0f;
-
 	// ------------------------------------------------------------------------
 
 	private InputGateMetrics(SingleInputGate inputGate) {
@@ -45,71 +42,95 @@ public class InputGateMetrics {
 
 	// these methods are package private to make access from the nested classes faster 
 
+	/**
+	 * Iterates over all input channels and collects the total number of queued buffers in a
+	 * best-effort way.
+	 *
+	 * @return total number of queued buffers
+	 */
 	long refreshAndGetTotal() {
-		long total;
-		if ((total = lastTotal) == -1) {
-			refresh();
-			total = lastTotal;
+		long total = 0;
+
+		for (InputChannel channel : inputGate.getInputChannels().values()) {
+			if (channel instanceof RemoteInputChannel) {
+				RemoteInputChannel rc = (RemoteInputChannel) channel;
+
+				total += rc.unsynchronizedGetNumberOfQueuedBuffers();
+			}
 		}
 
-		lastTotal = -1;
 		return total;
 	}
 
+	/**
+	 * Iterates over all input channels and collects the minimum number of queued buffers in a
+	 * channel in a best-effort way.
+	 *
+	 * @return minimum number of queued buffers per channel (<tt>0</tt> if no channels exist)
+	 */
 	int refreshAndGetMin() {
-		int min;
-		if ((min = lastMin) == -1) {
-			refresh();
-			min = lastMin;
+		int min = Integer.MAX_VALUE;
+
+		Collection<InputChannel> channels = inputGate.getInputChannels().values();
+		if (channels.isEmpty()) {
+			// meaningful value when no channels exist:
+			return 0;
+		}
+
+		for (InputChannel channel : channels) {
+			if (channel instanceof RemoteInputChannel) {
+				RemoteInputChannel rc = (RemoteInputChannel) channel;
+
+				int size = rc.unsynchronizedGetNumberOfQueuedBuffers();
+				min = Math.min(min, size);
+			}
 		}
 
-		lastMin = -1;
 		return min;
 	}
 
+	/**
+	 * Iterates over all input channels and collects the maximum number of queued buffers in a
+	 * channel in a best-effort way.
+	 *
+	 * @return maximum number of queued buffers per channel
+	 */
 	int refreshAndGetMax() {
-		int max;
-		if ((max = lastMax) == -1) {
-			refresh();
-			max = lastMax;
-		}
+		int max = 0;
 
-		lastMax = -1;
-		return max;
-	}
+		for (InputChannel channel : inputGate.getInputChannels().values()) {
+			if (channel instanceof RemoteInputChannel) {
+				RemoteInputChannel rc = (RemoteInputChannel) channel;
 
-	float refreshAndGetAvg() {
-		float avg;
-		if ((avg = lastAvg) < 0.0f) {
-			refresh();
-			avg = lastAvg;
+				int size = rc.unsynchronizedGetNumberOfQueuedBuffers();
+				max = Math.max(max, size);
+			}
 		}
 
-		lastAvg = -1.0f;
-		return avg;
+		return max;
 	}
 
-	private void refresh() {
+	/**
+	 * Iterates over all input channels and collects the average number of queued buffers in a
+	 * channel in a best-effort way.
+	 *
+	 * @return average number of queued buffers per channel
+	 */
+	float refreshAndGetAvg() {
 		long total = 0;
-		int min = Integer.MAX_VALUE;
-		int max = 0;
 		int count = 0;
 
 		for (InputChannel channel : inputGate.getInputChannels().values()) {
-			if (channel.getClass() == RemoteInputChannel.class) {
+			if (channel instanceof RemoteInputChannel) {
 				RemoteInputChannel rc = (RemoteInputChannel) channel;
 
 				int size = rc.unsynchronizedGetNumberOfQueuedBuffers();
 				total += size;
-				min = Math.min(min, size);
-				max = Math.max(max, size);
-				count++;
+				++count;
 			}
 		}
 
-		this.lastMin = min;
-		this.lastMax = max;
-		this.lastAvg = total / (float) count;
+		return total / (float) count;
 	}
 
 	// ------------------------------------------------------------------------
@@ -159,9 +180,9 @@ public class InputGateMetrics {
 	public static void registerQueueLengthMetrics(MetricGroup group, SingleInputGate gate) {
 		InputGateMetrics metrics = new InputGateMetrics(gate);
 
-		group.gauge("total-queue-len", metrics.getTotalQueueLenGauge());
-		group.gauge("min-queue-len", metrics.getMinQueueLenGauge());
-		group.gauge("max-queue-len", metrics.getMaxQueueLenGauge());
-		group.gauge("avg-queue-len", metrics.getAvgQueueLenGauge());
+		group.gauge("totalQueueLen", metrics.getTotalQueueLenGauge());
+		group.gauge("minQueueLen", metrics.getMinQueueLenGauge());
+		group.gauge("maxQueueLen", metrics.getMaxQueueLenGauge());
+		group.gauge("avgQueueLen", metrics.getAvgQueueLenGauge());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fff04bfb/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 5a402fd..dab0f95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -23,11 +23,11 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FileSystemSafetyNet;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -390,27 +390,10 @@ public class Task implements Runnable, TaskActions {
 			++counter;
 		}
 
-		// register detailed network metrics, if configured
-		if (tmConfig.getBoolean(ConfigConstants.NETWORK_DETAILED_METRICS_KEY, false)) {
-			// output metrics
-			for (int i = 0; i < producedPartitions.length; i++) {
-				ResultPartitionMetrics.registerQueueLengthMetrics(
-						metricGroup.addGroup("netout_" + i), producedPartitions[i]);
-			}
-
-			for (int i = 0; i < inputGates.length; i++) {
-				InputGateMetrics.registerQueueLengthMetrics(
-						metricGroup.addGroup("netin_" + i), inputGates[i]);
-			}
-		}
-
 		invokableHasBeenCanceled = new AtomicBoolean(false);
 
 		// finally, create the executing thread, but do not start it
 		executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
-
-		// add metrics for buffers
-		this.metrics.getIOMetricGroup().initializeBufferMetrics(this);
 	}
 
 	// ------------------------------------------------------------------------
@@ -616,6 +599,28 @@ public class Task implements Runnable, TaskActions {
 
 			network.registerTask(this);
 
+			// add metrics for buffers
+			this.metrics.getIOMetricGroup().initializeBufferMetrics(this);
+
+			// register detailed network metrics, if configured
+			if (taskManagerConfig.getConfiguration().getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS)) {
+				// similar to MetricUtils.instantiateNetworkMetrics() but inside this IOMetricGroup
+				MetricGroup networkGroup = this.metrics.getIOMetricGroup().addGroup("Network");
+				MetricGroup outputGroup = networkGroup.addGroup("Output");
+				MetricGroup inputGroup = networkGroup.addGroup("Input");
+
+				// output metrics
+				for (int i = 0; i < producedPartitions.length; i++) {
+					ResultPartitionMetrics.registerQueueLengthMetrics(
+						outputGroup.addGroup(i), producedPartitions[i]);
+				}
+
+				for (int i = 0; i < inputGates.length; i++) {
+					InputGateMetrics.registerQueueLengthMetrics(
+						inputGroup.addGroup(i), inputGates[i]);
+				}
+			}
+
 			// next, kick off the background copying of files for the distributed cache
 			try {
 				for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :


[2/4] flink git commit: [hotfix][metrics] remove an unnecessary check for non-null

Posted by al...@apache.org.
[hotfix][metrics] remove an unnecessary check for non-null

The metrics group given to a Task must always be non-null otherwise the code
would have crashed anyway. Similarly, the group returned by getIOMetricGroup()
is always present.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/931ce05a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/931ce05a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/931ce05a

Branch: refs/heads/master
Commit: 931ce05a47ce1063c056c1a558a974fbce559f41
Parents: 6cba8bb
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Feb 27 11:14:35 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Apr 27 15:14:09 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/runtime/taskmanager/Task.java     | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/931ce05a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f3dacc3..5a402fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -74,6 +74,7 @@ import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.net.URL;
@@ -280,7 +281,7 @@ public class Task implements Runnable, TaskActions {
 		LibraryCacheManager libraryCache,
 		FileCache fileCache,
 		TaskManagerRuntimeInfo taskManagerConfig,
-		TaskMetricGroup metricGroup,
+		@Nonnull TaskMetricGroup metricGroup,
 		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier,
 		PartitionProducerStateChecker partitionProducerStateChecker,
 		Executor executor) {
@@ -408,10 +409,8 @@ public class Task implements Runnable, TaskActions {
 		// finally, create the executing thread, but do not start it
 		executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
 
-		if (this.metrics != null && this.metrics.getIOMetricGroup() != null) {
-			// add metrics for buffers
-			this.metrics.getIOMetricGroup().initializeBufferMetrics(this);
-		}
+		// add metrics for buffers
+		this.metrics.getIOMetricGroup().initializeBufferMetrics(this);
 	}
 
 	// ------------------------------------------------------------------------