You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/09/04 07:13:07 UTC

[flink] branch release-1.6 updated: [FLINK-10150][metrics] Fix OperatorMetricGroup creation for Batch

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new 38f7552  [FLINK-10150][metrics] Fix OperatorMetricGroup creation for Batch
38f7552 is described below

commit 38f75527335a711ee0374a0fd3d28087d8568fb9
Author: zentol <ch...@apache.org>
AuthorDate: Tue Aug 21 18:37:07 2018 +0200

    [FLINK-10150][metrics] Fix OperatorMetricGroup creation for Batch
---
 .../runtime/metrics/groups/TaskMetricGroup.java    |   8 +-
 .../chaining/ChainedOperatorsMetricTest.java       | 175 +++++++++++++++++++++
 .../operators/testutils/MockEnvironment.java       |  10 +-
 .../testutils/MockEnvironmentBuilder.java          |  11 +-
 4 files changed, 197 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 441dbf8..124fbf2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -42,7 +42,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGroup> {
 
-	private final Map<OperatorID, OperatorMetricGroup> operators = new HashMap<>();
+	private final Map<String, OperatorMetricGroup> operators = new HashMap<>();
 
 	static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80;
 
@@ -144,15 +144,17 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
 			name = name.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
 		}
 		OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, operatorID, name);
+		// unique OperatorIDs only exist in streaming, so we have to rely on the name for batch operators
+		final String key = operatorID + name;
 
 		synchronized (this) {
-			OperatorMetricGroup previous = operators.put(operatorID, operator);
+			OperatorMetricGroup previous = operators.put(key, operator);
 			if (previous == null) {
 				// no operator group so far
 				return operator;
 			} else {
 				// already had an operator group. restore that one.
-				operators.put(operatorID, previous);
+				operators.put(key, previous);
 				return previous;
 			}
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
new file mode 100644
index 0000000..29ff6e8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.FlatMapDriver;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.TaskTestBase;
+import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Metrics related tests for batch task chains.
+ */
+public class ChainedOperatorsMetricTest extends TaskTestBase {
+
+	private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3;
+
+	private static final int NETWORK_BUFFER_SIZE = 1024;
+
+	private static final TypeSerializerFactory<Record> serFact = RecordSerializerFactory.get();
+
+	private final List<Record> outList = new ArrayList<>();
+
+	private static final String HEAD_OPERATOR_NAME = "headoperator";
+	private static final String CHAINED_OPERATOR_NAME = "chainedoperator";
+
+	@Test
+	public void testOperatorIOMetricReuse() throws Exception {
+		// environment
+		initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
+		this.mockEnv = new MockEnvironmentBuilder()
+			.setTaskName(HEAD_OPERATOR_NAME)
+			.setMemorySize(MEMORY_MANAGER_SIZE)
+			.setInputSplitProvider(this.inputSplitProvider)
+			.setBufferSize(NETWORK_BUFFER_SIZE)
+			.setMetricGroup(new TaskMetricGroup(
+				NoOpMetricRegistry.INSTANCE,
+				UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(),
+				new JobVertexID(),
+				new AbstractID(),
+				"task",
+				0,
+				0))
+			.build();
+
+		final int keyCnt = 100;
+		final int valCnt = 20;
+		final int numRecords = keyCnt * valCnt;
+		addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
+		addOutput(this.outList);
+
+		// the chained operator
+		addChainedOperator();
+
+		// creates the head operator and assembles the chain
+		registerTask(FlatMapDriver.class, DuplicatingFlatMapFunction.class);
+		final BatchTask<FlatMapFunction<Record, Record>, Record> testTask = new BatchTask<>(this.mockEnv);
+
+		testTask.invoke();
+
+		Assert.assertEquals(numRecords * 2 * 2, this.outList.size());
+
+		final TaskMetricGroup taskMetricGroup = mockEnv.getMetricGroup();
+
+		// verify task-level metrics
+		{
+			final TaskIOMetricGroup ioMetricGroup = taskMetricGroup.getIOMetricGroup();
+			final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter();
+			final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter();
+
+			Assert.assertEquals(numRecords, numRecordsInCounter.getCount());
+			Assert.assertEquals(numRecords * 2 * 2, numRecordsOutCounter.getCount());
+		}
+
+		// verify head operator metrics
+		{
+			// this only returns the existing group and doesn't create a new one
+			final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.addOperator(HEAD_OPERATOR_NAME);
+			final OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup1.getIOMetricGroup();
+			final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter();
+			final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter();
+
+			Assert.assertEquals(numRecords, numRecordsInCounter.getCount());
+			Assert.assertEquals(numRecords * 2, numRecordsOutCounter.getCount());
+		}
+
+		// verify chained operator metrics
+		{
+			// this only returns the existing group and doesn't create a new one
+			final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.addOperator(CHAINED_OPERATOR_NAME);
+			final OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup1.getIOMetricGroup();
+			final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter();
+			final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter();
+
+			Assert.assertEquals(numRecords * 2, numRecordsInCounter.getCount());
+			Assert.assertEquals(numRecords * 2 * 2, numRecordsOutCounter.getCount());
+		}
+	}
+
+	private void addChainedOperator() {
+		final TaskConfig chainedConfig = new TaskConfig(new Configuration());
+
+		// input
+		chainedConfig.addInputToGroup(0);
+		chainedConfig.setInputSerializer(serFact, 0);
+
+		// output
+		chainedConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
+		chainedConfig.setOutputSerializer(serFact);
+
+		// driver
+		chainedConfig.setDriverStrategy(DriverStrategy.FLAT_MAP);
+
+		// udf
+		chainedConfig.setStubWrapper(new UserCodeClassWrapper<>(DuplicatingFlatMapFunction.class));
+
+		getTaskConfig().addChainedTask(ChainedFlatMapDriver.class, chainedConfig, CHAINED_OPERATOR_NAME);
+	}
+
+	/**
+	 * Simple {@link FlatMapFunction} that duplicates the input.
+	 */
+	public static class DuplicatingFlatMapFunction extends RichFlatMapFunction<Record, Record> {
+
+		private static final long serialVersionUID = -1152068682935346164L;
+
+		@Override
+		public void flatMap(final Record value, final Collector<Record> out) throws Exception {
+			out.collect(value);
+			out.collect(value);
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 4bf94e9..68858bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -41,7 +41,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateManager;
@@ -108,6 +107,8 @@ public class MockEnvironment implements Environment, AutoCloseable {
 
 	private Optional<Throwable> actualExternalFailureCause = Optional.empty();
 
+	private final TaskMetricGroup taskMetricGroup;
+
 	public static MockEnvironmentBuilder builder() {
 		return new MockEnvironmentBuilder();
 	}
@@ -125,7 +126,8 @@ public class MockEnvironment implements Environment, AutoCloseable {
 		int maxParallelism,
 		int parallelism,
 		int subtaskIndex,
-		ClassLoader userCodeClassLoader) {
+		ClassLoader userCodeClassLoader,
+		TaskMetricGroup taskMetricGroup) {
 
 		this.jobID = jobID;
 		this.jobVertexID = jobVertexID;
@@ -150,6 +152,8 @@ public class MockEnvironment implements Environment, AutoCloseable {
 
 		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
 		this.taskStateManager = Preconditions.checkNotNull(taskStateManager);
+
+		this.taskMetricGroup = taskMetricGroup;
 	}
 
 
@@ -213,7 +217,7 @@ public class MockEnvironment implements Environment, AutoCloseable {
 
 	@Override
 	public TaskMetricGroup getMetricGroup() {
-		return UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+		return taskMetricGroup;
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
index dfb10d4..dfcc5f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 
@@ -40,6 +42,7 @@ public class MockEnvironmentBuilder {
 	private ClassLoader userCodeClassLoader = Thread.currentThread().getContextClassLoader();
 	private JobID jobID = new JobID();
 	private JobVertexID jobVertexID = new JobVertexID();
+	private TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
 
 	public MockEnvironmentBuilder setTaskName(String taskName) {
 		this.taskName = taskName;
@@ -106,6 +109,11 @@ public class MockEnvironmentBuilder {
 		return this;
 	}
 
+	public MockEnvironmentBuilder setMetricGroup(TaskMetricGroup taskMetricGroup) {
+		this.taskMetricGroup = taskMetricGroup;
+		return this;
+	}
+
 	public MockEnvironment build() {
 		return new MockEnvironment(
 			jobID,
@@ -120,6 +128,7 @@ public class MockEnvironmentBuilder {
 			maxParallelism,
 			parallelism,
 			subtaskIndex,
-			userCodeClassLoader);
+			userCodeClassLoader,
+			taskMetricGroup);
 	}
 }