You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/09/03 13:03:37 UTC

[flink] 02/02: [hotfix][metrics] Rename TaskMetricGroup#addOperator

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4573d48008d3b19fa7e90ada7d2783ed02ba71d0
Author: zentol <ch...@apache.org>
AuthorDate: Mon Sep 3 10:44:07 2018 +0200

    [hotfix][metrics] Rename TaskMetricGroup#addOperator
---
 .../org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java    | 6 +++---
 .../flink/runtime/metrics/groups/UnregisteredMetricGroups.java      | 2 +-
 .../src/main/java/org/apache/flink/runtime/operators/BatchTask.java | 2 +-
 .../main/java/org/apache/flink/runtime/operators/DataSinkTask.java  | 2 +-
 .../java/org/apache/flink/runtime/operators/DataSourceTask.java     | 2 +-
 .../org/apache/flink/runtime/operators/chaining/ChainedDriver.java  | 2 +-
 .../org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java  | 2 +-
 .../apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java    | 2 +-
 .../runtime/operators/chaining/ChainedOperatorsMetricTest.java      | 4 ++--
 .../flink/streaming/api/operators/AbstractStreamOperator.java       | 2 +-
 .../flink/streaming/runtime/tasks/OneInputStreamTaskTest.java       | 6 +++---
 .../flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java       | 6 +++---
 12 files changed, 19 insertions(+), 19 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 124fbf2..39d98d8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -134,11 +134,11 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
 	//  operators and cleanup
 	// ------------------------------------------------------------------------
 
-	public OperatorMetricGroup addOperator(String name) {
-		return addOperator(OperatorID.fromJobVertexID(vertexId), name);
+	public OperatorMetricGroup getOrAddOperator(String name) {
+		return getOrAddOperator(OperatorID.fromJobVertexID(vertexId), name);
 	}
 
-	public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
+	public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) {
 		if (name != null && name.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
 			LOG.warn("The operator name {} exceeded the {} characters length limit and was truncated.", name, METRICS_OPERATOR_NAME_MAX_LENGTH);
 			name = name.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
index 3869aa6..8c635eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
@@ -145,7 +145,7 @@ public class UnregisteredMetricGroups {
 		}
 
 		@Override
-		public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
+		public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) {
 			return createUnregisteredOperatorMetricGroup();
 		}
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index e697869..d5f2fd0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -254,7 +254,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 
 		String headName =  getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
 		this.metrics = getEnvironment().getMetricGroup()
-			.addOperator(headName.startsWith("CHAIN") ? headName.substring(6) : headName);
+			.getOrAddOperator(headName.startsWith("CHAIN") ? headName.substring(6) : headName);
 		this.metrics.getIOMetricGroup().reuseInputMetricsForTask();
 		if (config.getNumberOfChainedStubs() == 0) {
 			this.metrics.getIOMetricGroup().reuseOutputMetricsForTask();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 0ea376e..2c26301 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -409,6 +409,6 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 
 		return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
 				getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(), 
-				getEnvironment().getMetricGroup().addOperator(getEnvironment().getTaskInfo().getTaskName()));
+				getEnvironment().getMetricGroup().getOrAddOperator(getEnvironment().getTaskInfo().getTaskName()));
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index cdfe1fa..a5ccfab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -402,6 +402,6 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 		sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName;
 		return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
 				getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(), 
-				getEnvironment().getMetricGroup().addOperator(sourceName));
+				getEnvironment().getMetricGroup().getOrAddOperator(sourceName));
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
index 442a53c..7dd3a14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
@@ -69,7 +69,7 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> {
 		this.config = config;
 		this.taskName = taskName;
 		this.userCodeClassLoader = userCodeClassLoader;
-		this.metrics = parent.getEnvironment().getMetricGroup().addOperator(taskName);
+		this.metrics = parent.getEnvironment().getMetricGroup().getOrAddOperator(taskName);
 		this.numRecordsIn = this.metrics.getIOMetricGroup().getNumRecordsInCounter();
 		this.numRecordsOut = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
 		this.outputCollector = new CountingCollector<>(outputCollector, numRecordsOut);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
index 58198e4..2f62f55 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
@@ -93,7 +93,7 @@ public class OperatorGroupTest extends TestLogger {
 			OperatorMetricGroup operatorGroup =
 				new TaskManagerMetricGroup(registry, "theHostName", tmID)
 					.addTaskForJob(jid, "myJobName", vertexId, new ExecutionAttemptID(), "aTaskname", 13, 2)
-					.addOperator(operatorID, operatorName);
+					.getOrAddOperator(operatorID, operatorName);
 
 			assertArrayEquals(
 				new String[]{tmID, jid.toString(), vertexId.toString(), operatorName, operatorID.toString()},
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index d9e6158..079c7c6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -176,7 +176,7 @@ public class TaskMetricGroupTest extends TestLogger {
 		TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, job, new JobVertexID(), new AbstractID(), "task", 0, 0);
 
 		String originalName = new String(new char[100]).replace("\0", "-");
-		OperatorMetricGroup operatorMetricGroup = taskMetricGroup.addOperator(originalName);
+		OperatorMetricGroup operatorMetricGroup = taskMetricGroup.getOrAddOperator(originalName);
 
 		String storedName = operatorMetricGroup.getScopeComponents()[0];
 		Assert.assertEquals(TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH, storedName.length());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
index 29ff6e8..c393af5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
@@ -117,7 +117,7 @@ public class ChainedOperatorsMetricTest extends TaskTestBase {
 		// verify head operator metrics
 		{
 			// this only returns the existing group and doesn't create a new one
-			final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.addOperator(HEAD_OPERATOR_NAME);
+			final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.getOrAddOperator(HEAD_OPERATOR_NAME);
 			final OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup1.getIOMetricGroup();
 			final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter();
 			final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter();
@@ -129,7 +129,7 @@ public class ChainedOperatorsMetricTest extends TaskTestBase {
 		// verify chained operator metrics
 		{
 			// this only returns the existing group and doesn't create a new one
-			final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.addOperator(CHAINED_OPERATOR_NAME);
+			final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.getOrAddOperator(CHAINED_OPERATOR_NAME);
 			final OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup1.getIOMetricGroup();
 			final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter();
 			final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 52ba3e4..f52168b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -171,7 +171,7 @@ public abstract class AbstractStreamOperator<OUT>
 		this.container = containingTask;
 		this.config = config;
 		try {
-			OperatorMetricGroup operatorMetricGroup = environment.getMetricGroup().addOperator(config.getOperatorID(), config.getOperatorName());
+			OperatorMetricGroup operatorMetricGroup = environment.getMetricGroup().getOrAddOperator(config.getOperatorID(), config.getOperatorName());
 			this.output = new CountingOutput(output, operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
 			if (config.isChainStart()) {
 				operatorMetricGroup.getIOMetricGroup().reuseInputMetricsForTask();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 96eaa78..f6d5021 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -626,7 +626,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 		final TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
 			@Override
-			public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
+			public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) {
 				return new OperatorMetricGroup(NoOpMetricRegistry.INSTANCE, this, operatorID, name);
 			}
 		};
@@ -682,13 +682,13 @@ public class OneInputStreamTaskTest extends TestLogger {
 		InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
 		InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
 			@Override
-			public OperatorMetricGroup addOperator(OperatorID id, String name) {
+			public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
 				if (id.equals(headOperatorId)) {
 					return headOperatorMetricGroup;
 				} else if (id.equals(chainedOperatorId)) {
 					return chainedOperatorMetricGroup;
 				} else {
-					return super.addOperator(id, name);
+					return super.getOrAddOperator(id, name);
 				}
 			}
 		};
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 5d15157..c48b647 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -402,7 +402,7 @@ public class TwoInputStreamTaskTest {
 
 		final TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
 			@Override
-			public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
+			public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) {
 				return new OperatorMetricGroup(NoOpMetricRegistry.INSTANCE, this, operatorID, name);
 			}
 		};
@@ -470,13 +470,13 @@ public class TwoInputStreamTaskTest {
 		InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
 		InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
 			@Override
-			public OperatorMetricGroup addOperator(OperatorID id, String name) {
+			public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
 				if (id.equals(headOperatorId)) {
 					return headOperatorMetricGroup;
 				} else if (id.equals(chainedOperatorId)) {
 					return chainedOperatorMetricGroup;
 				} else {
-					return super.addOperator(id, name);
+					return super.getOrAddOperator(id, name);
 				}
 			}
 		};