You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/09/03 13:03:37 UTC
[flink] 02/02: [hotfix][metrics] Rename TaskMetricGroup#addOperator
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4573d48008d3b19fa7e90ada7d2783ed02ba71d0
Author: zentol <ch...@apache.org>
AuthorDate: Mon Sep 3 10:44:07 2018 +0200
[hotfix][metrics] Rename TaskMetricGroup#addOperator
---
.../org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java | 6 +++---
.../flink/runtime/metrics/groups/UnregisteredMetricGroups.java | 2 +-
.../src/main/java/org/apache/flink/runtime/operators/BatchTask.java | 2 +-
.../main/java/org/apache/flink/runtime/operators/DataSinkTask.java | 2 +-
.../java/org/apache/flink/runtime/operators/DataSourceTask.java | 2 +-
.../org/apache/flink/runtime/operators/chaining/ChainedDriver.java | 2 +-
.../org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java | 2 +-
.../apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java | 2 +-
.../runtime/operators/chaining/ChainedOperatorsMetricTest.java | 4 ++--
.../flink/streaming/api/operators/AbstractStreamOperator.java | 2 +-
.../flink/streaming/runtime/tasks/OneInputStreamTaskTest.java | 6 +++---
.../flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java | 6 +++---
12 files changed, 19 insertions(+), 19 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 124fbf2..39d98d8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -134,11 +134,11 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
// operators and cleanup
// ------------------------------------------------------------------------
- public OperatorMetricGroup addOperator(String name) {
- return addOperator(OperatorID.fromJobVertexID(vertexId), name);
+ public OperatorMetricGroup getOrAddOperator(String name) {
+ return getOrAddOperator(OperatorID.fromJobVertexID(vertexId), name);
}
- public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
+ public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) {
if (name != null && name.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
LOG.warn("The operator name {} exceeded the {} characters length limit and was truncated.", name, METRICS_OPERATOR_NAME_MAX_LENGTH);
name = name.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
index 3869aa6..8c635eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
@@ -145,7 +145,7 @@ public class UnregisteredMetricGroups {
}
@Override
- public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
+ public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) {
return createUnregisteredOperatorMetricGroup();
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index e697869..d5f2fd0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -254,7 +254,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
String headName = getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
this.metrics = getEnvironment().getMetricGroup()
- .addOperator(headName.startsWith("CHAIN") ? headName.substring(6) : headName);
+ .getOrAddOperator(headName.startsWith("CHAIN") ? headName.substring(6) : headName);
this.metrics.getIOMetricGroup().reuseInputMetricsForTask();
if (config.getNumberOfChainedStubs() == 0) {
this.metrics.getIOMetricGroup().reuseOutputMetricsForTask();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 0ea376e..2c26301 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -409,6 +409,6 @@ public class DataSinkTask<IT> extends AbstractInvokable {
return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(),
- getEnvironment().getMetricGroup().addOperator(getEnvironment().getTaskInfo().getTaskName()));
+ getEnvironment().getMetricGroup().getOrAddOperator(getEnvironment().getTaskInfo().getTaskName()));
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index cdfe1fa..a5ccfab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -402,6 +402,6 @@ public class DataSourceTask<OT> extends AbstractInvokable {
sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName;
return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(),
- getEnvironment().getMetricGroup().addOperator(sourceName));
+ getEnvironment().getMetricGroup().getOrAddOperator(sourceName));
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
index 442a53c..7dd3a14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
@@ -69,7 +69,7 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> {
this.config = config;
this.taskName = taskName;
this.userCodeClassLoader = userCodeClassLoader;
- this.metrics = parent.getEnvironment().getMetricGroup().addOperator(taskName);
+ this.metrics = parent.getEnvironment().getMetricGroup().getOrAddOperator(taskName);
this.numRecordsIn = this.metrics.getIOMetricGroup().getNumRecordsInCounter();
this.numRecordsOut = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
this.outputCollector = new CountingCollector<>(outputCollector, numRecordsOut);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
index 58198e4..2f62f55 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
@@ -93,7 +93,7 @@ public class OperatorGroupTest extends TestLogger {
OperatorMetricGroup operatorGroup =
new TaskManagerMetricGroup(registry, "theHostName", tmID)
.addTaskForJob(jid, "myJobName", vertexId, new ExecutionAttemptID(), "aTaskname", 13, 2)
- .addOperator(operatorID, operatorName);
+ .getOrAddOperator(operatorID, operatorName);
assertArrayEquals(
new String[]{tmID, jid.toString(), vertexId.toString(), operatorName, operatorID.toString()},
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index d9e6158..079c7c6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -176,7 +176,7 @@ public class TaskMetricGroupTest extends TestLogger {
TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, job, new JobVertexID(), new AbstractID(), "task", 0, 0);
String originalName = new String(new char[100]).replace("\0", "-");
- OperatorMetricGroup operatorMetricGroup = taskMetricGroup.addOperator(originalName);
+ OperatorMetricGroup operatorMetricGroup = taskMetricGroup.getOrAddOperator(originalName);
String storedName = operatorMetricGroup.getScopeComponents()[0];
Assert.assertEquals(TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH, storedName.length());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
index 29ff6e8..c393af5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
@@ -117,7 +117,7 @@ public class ChainedOperatorsMetricTest extends TaskTestBase {
// verify head operator metrics
{
// this only returns the existing group and doesn't create a new one
- final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.addOperator(HEAD_OPERATOR_NAME);
+ final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.getOrAddOperator(HEAD_OPERATOR_NAME);
final OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup1.getIOMetricGroup();
final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter();
final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter();
@@ -129,7 +129,7 @@ public class ChainedOperatorsMetricTest extends TaskTestBase {
// verify chained operator metrics
{
// this only returns the existing group and doesn't create a new one
- final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.addOperator(CHAINED_OPERATOR_NAME);
+ final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.getOrAddOperator(CHAINED_OPERATOR_NAME);
final OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup1.getIOMetricGroup();
final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter();
final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 52ba3e4..f52168b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -171,7 +171,7 @@ public abstract class AbstractStreamOperator<OUT>
this.container = containingTask;
this.config = config;
try {
- OperatorMetricGroup operatorMetricGroup = environment.getMetricGroup().addOperator(config.getOperatorID(), config.getOperatorName());
+ OperatorMetricGroup operatorMetricGroup = environment.getMetricGroup().getOrAddOperator(config.getOperatorID(), config.getOperatorName());
this.output = new CountingOutput(output, operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
if (config.isChainStart()) {
operatorMetricGroup.getIOMetricGroup().reuseInputMetricsForTask();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 96eaa78..f6d5021 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -626,7 +626,7 @@ public class OneInputStreamTaskTest extends TestLogger {
final TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
@Override
- public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
+ public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) {
return new OperatorMetricGroup(NoOpMetricRegistry.INSTANCE, this, operatorID, name);
}
};
@@ -682,13 +682,13 @@ public class OneInputStreamTaskTest extends TestLogger {
InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
@Override
- public OperatorMetricGroup addOperator(OperatorID id, String name) {
+ public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
if (id.equals(headOperatorId)) {
return headOperatorMetricGroup;
} else if (id.equals(chainedOperatorId)) {
return chainedOperatorMetricGroup;
} else {
- return super.addOperator(id, name);
+ return super.getOrAddOperator(id, name);
}
}
};
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 5d15157..c48b647 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -402,7 +402,7 @@ public class TwoInputStreamTaskTest {
final TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
@Override
- public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
+ public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) {
return new OperatorMetricGroup(NoOpMetricRegistry.INSTANCE, this, operatorID, name);
}
};
@@ -470,13 +470,13 @@ public class TwoInputStreamTaskTest {
InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
@Override
- public OperatorMetricGroup addOperator(OperatorID id, String name) {
+ public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
if (id.equals(headOperatorId)) {
return headOperatorMetricGroup;
} else if (id.equals(chainedOperatorId)) {
return chainedOperatorMetricGroup;
} else {
- return super.addOperator(id, name);
+ return super.getOrAddOperator(id, name);
}
}
};