You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/04 07:12:59 UTC

[GitHub] zentol closed pull request #6654: [1.5][FLINK-10150][metrics] Fix OperatorMetricGroup creation for Batch

zentol closed pull request #6654: [1.5][FLINK-10150][metrics] Fix OperatorMetricGroup creation for Batch
URL: https://github.com/apache/flink/pull/6654
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 441dbf8ea58..124fbf22872 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -42,7 +42,7 @@
 @Internal
 public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGroup> {
 
-	private final Map<OperatorID, OperatorMetricGroup> operators = new HashMap<>();
+	private final Map<String, OperatorMetricGroup> operators = new HashMap<>();
 
 	static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80;
 
@@ -144,15 +144,17 @@ public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
 			name = name.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
 		}
 		OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, operatorID, name);
+		// unique OperatorIDs only exist in streaming, so we have to rely on the name for batch operators
+		final String key = operatorID + name;
 
 		synchronized (this) {
-			OperatorMetricGroup previous = operators.put(operatorID, operator);
+			OperatorMetricGroup previous = operators.put(key, operator);
 			if (previous == null) {
 				// no operator group so far
 				return operator;
 			} else {
 				// already had an operator group. restore that one.
-				operators.put(operatorID, previous);
+				operators.put(key, previous);
 				return previous;
 			}
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
new file mode 100644
index 00000000000..29ff6e86671
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.FlatMapDriver;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.TaskTestBase;
+import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Metrics related tests for batch task chains.
+ */
+public class ChainedOperatorsMetricTest extends TaskTestBase {
+
+	private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3;
+
+	private static final int NETWORK_BUFFER_SIZE = 1024;
+
+	private static final TypeSerializerFactory<Record> serFact = RecordSerializerFactory.get();
+
+	private final List<Record> outList = new ArrayList<>();
+
+	private static final String HEAD_OPERATOR_NAME = "headoperator";
+	private static final String CHAINED_OPERATOR_NAME = "chainedoperator";
+
+	@Test
+	public void testOperatorIOMetricReuse() throws Exception {
+		// environment
+		initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
+		this.mockEnv = new MockEnvironmentBuilder()
+			.setTaskName(HEAD_OPERATOR_NAME)
+			.setMemorySize(MEMORY_MANAGER_SIZE)
+			.setInputSplitProvider(this.inputSplitProvider)
+			.setBufferSize(NETWORK_BUFFER_SIZE)
+			.setMetricGroup(new TaskMetricGroup(
+				NoOpMetricRegistry.INSTANCE,
+				UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(),
+				new JobVertexID(),
+				new AbstractID(),
+				"task",
+				0,
+				0))
+			.build();
+
+		final int keyCnt = 100;
+		final int valCnt = 20;
+		final int numRecords = keyCnt * valCnt;
+		addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
+		addOutput(this.outList);
+
+		// the chained operator
+		addChainedOperator();
+
+		// creates the head operator and assembles the chain
+		registerTask(FlatMapDriver.class, DuplicatingFlatMapFunction.class);
+		final BatchTask<FlatMapFunction<Record, Record>, Record> testTask = new BatchTask<>(this.mockEnv);
+
+		testTask.invoke();
+
+		Assert.assertEquals(numRecords * 2 * 2, this.outList.size());
+
+		final TaskMetricGroup taskMetricGroup = mockEnv.getMetricGroup();
+
+		// verify task-level metrics
+		{
+			final TaskIOMetricGroup ioMetricGroup = taskMetricGroup.getIOMetricGroup();
+			final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter();
+			final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter();
+
+			Assert.assertEquals(numRecords, numRecordsInCounter.getCount());
+			Assert.assertEquals(numRecords * 2 * 2, numRecordsOutCounter.getCount());
+		}
+
+		// verify head operator metrics
+		{
+			// this only returns the existing group and doesn't create a new one
+			final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.addOperator(HEAD_OPERATOR_NAME);
+			final OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup1.getIOMetricGroup();
+			final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter();
+			final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter();
+
+			Assert.assertEquals(numRecords, numRecordsInCounter.getCount());
+			Assert.assertEquals(numRecords * 2, numRecordsOutCounter.getCount());
+		}
+
+		// verify chained operator metrics
+		{
+			// this only returns the existing group and doesn't create a new one
+			final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.addOperator(CHAINED_OPERATOR_NAME);
+			final OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup1.getIOMetricGroup();
+			final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter();
+			final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter();
+
+			Assert.assertEquals(numRecords * 2, numRecordsInCounter.getCount());
+			Assert.assertEquals(numRecords * 2 * 2, numRecordsOutCounter.getCount());
+		}
+	}
+
+	private void addChainedOperator() {
+		final TaskConfig chainedConfig = new TaskConfig(new Configuration());
+
+		// input
+		chainedConfig.addInputToGroup(0);
+		chainedConfig.setInputSerializer(serFact, 0);
+
+		// output
+		chainedConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
+		chainedConfig.setOutputSerializer(serFact);
+
+		// driver
+		chainedConfig.setDriverStrategy(DriverStrategy.FLAT_MAP);
+
+		// udf
+		chainedConfig.setStubWrapper(new UserCodeClassWrapper<>(DuplicatingFlatMapFunction.class));
+
+		getTaskConfig().addChainedTask(ChainedFlatMapDriver.class, chainedConfig, CHAINED_OPERATOR_NAME);
+	}
+
+	/**
+	 * Simple {@link FlatMapFunction} that duplicates the input.
+	 */
+	public static class DuplicatingFlatMapFunction extends RichFlatMapFunction<Record, Record> {
+
+		private static final long serialVersionUID = -1152068682935346164L;
+
+		@Override
+		public void flatMap(final Record value, final Collector<Record> out) throws Exception {
+			out.collect(value);
+			out.collect(value);
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 4bf94e93463..68858bc0cea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -41,7 +41,6 @@
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateManager;
@@ -108,6 +107,8 @@
 
 	private Optional<Throwable> actualExternalFailureCause = Optional.empty();
 
+	private final TaskMetricGroup taskMetricGroup;
+
 	public static MockEnvironmentBuilder builder() {
 		return new MockEnvironmentBuilder();
 	}
@@ -125,7 +126,8 @@ protected MockEnvironment(
 		int maxParallelism,
 		int parallelism,
 		int subtaskIndex,
-		ClassLoader userCodeClassLoader) {
+		ClassLoader userCodeClassLoader,
+		TaskMetricGroup taskMetricGroup) {
 
 		this.jobID = jobID;
 		this.jobVertexID = jobVertexID;
@@ -150,6 +152,8 @@ protected MockEnvironment(
 
 		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
 		this.taskStateManager = Preconditions.checkNotNull(taskStateManager);
+
+		this.taskMetricGroup = taskMetricGroup;
 	}
 
 
@@ -213,7 +217,7 @@ public TaskManagerRuntimeInfo getTaskManagerInfo() {
 
 	@Override
 	public TaskMetricGroup getMetricGroup() {
-		return UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+		return taskMetricGroup;
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
index dfb10d4293a..dfcc5f312e0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
@@ -23,6 +23,8 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 
@@ -40,6 +42,7 @@
 	private ClassLoader userCodeClassLoader = Thread.currentThread().getContextClassLoader();
 	private JobID jobID = new JobID();
 	private JobVertexID jobVertexID = new JobVertexID();
+	private TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
 
 	public MockEnvironmentBuilder setTaskName(String taskName) {
 		this.taskName = taskName;
@@ -106,6 +109,11 @@ public MockEnvironmentBuilder setJobVertexID(JobVertexID jobVertexID) {
 		return this;
 	}
 
+	public MockEnvironmentBuilder setMetricGroup(TaskMetricGroup taskMetricGroup) {
+		this.taskMetricGroup = taskMetricGroup;
+		return this;
+	}
+
 	public MockEnvironment build() {
 		return new MockEnvironment(
 			jobID,
@@ -120,6 +128,7 @@ public MockEnvironment build() {
 			maxParallelism,
 			parallelism,
 			subtaskIndex,
-			userCodeClassLoader);
+			userCodeClassLoader,
+			taskMetricGroup);
 	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services