You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/09/03 13:03:36 UTC
[flink] 01/02: [FLINK-10150][metrics] Fix OperatorMetricGroup
creation for Batch
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 93ac95866e4473982a89e563d58ef0e374b3c0ba
Author: zentol <ch...@apache.org>
AuthorDate: Tue Aug 21 18:37:07 2018 +0200
[FLINK-10150][metrics] Fix OperatorMetricGroup creation for Batch
---
.../runtime/metrics/groups/TaskMetricGroup.java | 8 +-
.../chaining/ChainedOperatorsMetricTest.java | 175 +++++++++++++++++++++
.../operators/testutils/MockEnvironment.java | 10 +-
.../testutils/MockEnvironmentBuilder.java | 11 +-
4 files changed, 197 insertions(+), 7 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 441dbf8..124fbf2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -42,7 +42,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGroup> {
- private final Map<OperatorID, OperatorMetricGroup> operators = new HashMap<>();
+ private final Map<String, OperatorMetricGroup> operators = new HashMap<>();
static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80;
@@ -144,15 +144,17 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
name = name.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
}
OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, operatorID, name);
+ // unique OperatorIDs only exist in streaming, so we have to rely on the name for batch operators
+ final String key = operatorID + name;
synchronized (this) {
- OperatorMetricGroup previous = operators.put(operatorID, operator);
+ OperatorMetricGroup previous = operators.put(key, operator);
if (previous == null) {
// no operator group so far
return operator;
} else {
// already had an operator group. restore that one.
- operators.put(operatorID, previous);
+ operators.put(key, previous);
return previous;
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
new file mode 100644
index 0000000..29ff6e8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.FlatMapDriver;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.TaskTestBase;
+import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Metrics related tests for batch task chains.
+ */
+public class ChainedOperatorsMetricTest extends TaskTestBase {
+
+ private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3;
+
+ private static final int NETWORK_BUFFER_SIZE = 1024;
+
+ private static final TypeSerializerFactory<Record> serFact = RecordSerializerFactory.get();
+
+ private final List<Record> outList = new ArrayList<>();
+
+ private static final String HEAD_OPERATOR_NAME = "headoperator";
+ private static final String CHAINED_OPERATOR_NAME = "chainedoperator";
+
+ @Test
+ public void testOperatorIOMetricReuse() throws Exception {
+ // environment
+ initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
+ this.mockEnv = new MockEnvironmentBuilder()
+ .setTaskName(HEAD_OPERATOR_NAME)
+ .setMemorySize(MEMORY_MANAGER_SIZE)
+ .setInputSplitProvider(this.inputSplitProvider)
+ .setBufferSize(NETWORK_BUFFER_SIZE)
+ .setMetricGroup(new TaskMetricGroup(
+ NoOpMetricRegistry.INSTANCE,
+ UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(),
+ new JobVertexID(),
+ new AbstractID(),
+ "task",
+ 0,
+ 0))
+ .build();
+
+ final int keyCnt = 100;
+ final int valCnt = 20;
+ final int numRecords = keyCnt * valCnt;
+ addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
+ addOutput(this.outList);
+
+ // the chained operator
+ addChainedOperator();
+
+ // creates the head operator and assembles the chain
+ registerTask(FlatMapDriver.class, DuplicatingFlatMapFunction.class);
+ final BatchTask<FlatMapFunction<Record, Record>, Record> testTask = new BatchTask<>(this.mockEnv);
+
+ testTask.invoke();
+
+ Assert.assertEquals(numRecords * 2 * 2, this.outList.size());
+
+ final TaskMetricGroup taskMetricGroup = mockEnv.getMetricGroup();
+
+ // verify task-level metrics
+ {
+ final TaskIOMetricGroup ioMetricGroup = taskMetricGroup.getIOMetricGroup();
+ final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter();
+ final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter();
+
+ Assert.assertEquals(numRecords, numRecordsInCounter.getCount());
+ Assert.assertEquals(numRecords * 2 * 2, numRecordsOutCounter.getCount());
+ }
+
+ // verify head operator metrics
+ {
+ // this only returns the existing group and doesn't create a new one
+ final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.addOperator(HEAD_OPERATOR_NAME);
+ final OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup1.getIOMetricGroup();
+ final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter();
+ final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter();
+
+ Assert.assertEquals(numRecords, numRecordsInCounter.getCount());
+ Assert.assertEquals(numRecords * 2, numRecordsOutCounter.getCount());
+ }
+
+ // verify chained operator metrics
+ {
+ // this only returns the existing group and doesn't create a new one
+ final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.addOperator(CHAINED_OPERATOR_NAME);
+ final OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup1.getIOMetricGroup();
+ final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter();
+ final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter();
+
+ Assert.assertEquals(numRecords * 2, numRecordsInCounter.getCount());
+ Assert.assertEquals(numRecords * 2 * 2, numRecordsOutCounter.getCount());
+ }
+ }
+
+ private void addChainedOperator() {
+ final TaskConfig chainedConfig = new TaskConfig(new Configuration());
+
+ // input
+ chainedConfig.addInputToGroup(0);
+ chainedConfig.setInputSerializer(serFact, 0);
+
+ // output
+ chainedConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
+ chainedConfig.setOutputSerializer(serFact);
+
+ // driver
+ chainedConfig.setDriverStrategy(DriverStrategy.FLAT_MAP);
+
+ // udf
+ chainedConfig.setStubWrapper(new UserCodeClassWrapper<>(DuplicatingFlatMapFunction.class));
+
+ getTaskConfig().addChainedTask(ChainedFlatMapDriver.class, chainedConfig, CHAINED_OPERATOR_NAME);
+ }
+
+ /**
+ * Simple {@link FlatMapFunction} that duplicates the input.
+ */
+ public static class DuplicatingFlatMapFunction extends RichFlatMapFunction<Record, Record> {
+
+ private static final long serialVersionUID = -1152068682935346164L;
+
+ @Override
+ public void flatMap(final Record value, final Collector<Record> out) throws Exception {
+ out.collect(value);
+ out.collect(value);
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 4bf94e9..68858bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -41,7 +41,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.TaskStateManager;
@@ -108,6 +107,8 @@ public class MockEnvironment implements Environment, AutoCloseable {
private Optional<Throwable> actualExternalFailureCause = Optional.empty();
+ private final TaskMetricGroup taskMetricGroup;
+
public static MockEnvironmentBuilder builder() {
return new MockEnvironmentBuilder();
}
@@ -125,7 +126,8 @@ public class MockEnvironment implements Environment, AutoCloseable {
int maxParallelism,
int parallelism,
int subtaskIndex,
- ClassLoader userCodeClassLoader) {
+ ClassLoader userCodeClassLoader,
+ TaskMetricGroup taskMetricGroup) {
this.jobID = jobID;
this.jobVertexID = jobVertexID;
@@ -150,6 +152,8 @@ public class MockEnvironment implements Environment, AutoCloseable {
this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
this.taskStateManager = Preconditions.checkNotNull(taskStateManager);
+
+ this.taskMetricGroup = taskMetricGroup;
}
@@ -213,7 +217,7 @@ public class MockEnvironment implements Environment, AutoCloseable {
@Override
public TaskMetricGroup getMetricGroup() {
- return UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+ return taskMetricGroup;
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
index dfb10d4..dfcc5f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
@@ -40,6 +42,7 @@ public class MockEnvironmentBuilder {
private ClassLoader userCodeClassLoader = Thread.currentThread().getContextClassLoader();
private JobID jobID = new JobID();
private JobVertexID jobVertexID = new JobVertexID();
+ private TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
public MockEnvironmentBuilder setTaskName(String taskName) {
this.taskName = taskName;
@@ -106,6 +109,11 @@ public class MockEnvironmentBuilder {
return this;
}
+ public MockEnvironmentBuilder setMetricGroup(TaskMetricGroup taskMetricGroup) {
+ this.taskMetricGroup = taskMetricGroup;
+ return this;
+ }
+
public MockEnvironment build() {
return new MockEnvironment(
jobID,
@@ -120,6 +128,7 @@ public class MockEnvironmentBuilder {
maxParallelism,
parallelism,
subtaskIndex,
- userCodeClassLoader);
+ userCodeClassLoader,
+ taskMetricGroup);
}
}