You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/10/14 11:59:26 UTC

flink git commit: [FLINK-4774] [metrics] Fix scope concatenation in QueryScopeInfo

Repository: flink
Updated Branches:
  refs/heads/master 21e8e2dcf -> 9948d4844


[FLINK-4774] [metrics] Fix scope concatenation in QueryScopeInfo

This closes #2613.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9948d484
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9948d484
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9948d484

Branch: refs/heads/master
Commit: 9948d48443460a881eb79983f586c7bd18201674
Parents: 21e8e2d
Author: zentol <ch...@apache.org>
Authored: Fri Oct 7 13:11:58 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Oct 14 13:58:58 2016 +0200

----------------------------------------------------------------------
 .../runtime/metrics/dump/QueryScopeInfo.java    |  16 +-
 .../runtime/metrics/groups/MetricGroupTest.java |  21 ++-
 .../metrics/groups/QueryScopeInfoTest.java      | 156 +++++++++++++++++++
 3 files changed, 181 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9948d484/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
index df5c2bf..6572ca0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
@@ -50,6 +50,12 @@ public abstract class QueryScopeInfo {
      */
 	public abstract byte getCategory();
 
+	protected String concatScopes(String additionalScope) {
+		return scope.isEmpty()
+			? additionalScope
+			: scope + "." + additionalScope;
+	}
+
 	/**
 	 * Container for the job manager scope. Stores no additional information.
      */
@@ -64,7 +70,7 @@ public abstract class QueryScopeInfo {
 
 		@Override
 		public JobManagerQueryScopeInfo copy(String additionalScope) {
-			return new JobManagerQueryScopeInfo(this.scope + additionalScope);
+			return new JobManagerQueryScopeInfo(concatScopes(additionalScope));
 		}
 
 		@Override
@@ -90,7 +96,7 @@ public abstract class QueryScopeInfo {
 
 		@Override
 		public TaskManagerQueryScopeInfo copy(String additionalScope) {
-			return new TaskManagerQueryScopeInfo(this.taskManagerID, this.scope + additionalScope);
+			return new TaskManagerQueryScopeInfo(this.taskManagerID, concatScopes(additionalScope));
 		}
 
 		@Override
@@ -116,7 +122,7 @@ public abstract class QueryScopeInfo {
 
 		@Override
 		public JobQueryScopeInfo copy(String additionalScope) {
-			return new JobQueryScopeInfo(this.jobID, this.scope + additionalScope);
+			return new JobQueryScopeInfo(this.jobID, concatScopes(additionalScope));
 		}
 
 		@Override
@@ -146,7 +152,7 @@ public abstract class QueryScopeInfo {
 
 		@Override
 		public TaskQueryScopeInfo copy(String additionalScope) {
-			return new TaskQueryScopeInfo(this.jobID, this.vertexID, this.subtaskIndex, this.scope + additionalScope);
+			return new TaskQueryScopeInfo(this.jobID, this.vertexID, this.subtaskIndex, concatScopes(additionalScope));
 		}
 
 		@Override
@@ -178,7 +184,7 @@ public abstract class QueryScopeInfo {
 
 		@Override
 		public OperatorQueryScopeInfo copy(String additionalScope) {
-			return new OperatorQueryScopeInfo(this.jobID, this.vertexID, this.subtaskIndex, this.operatorName, this.scope + additionalScope);
+			return new OperatorQueryScopeInfo(this.jobID, this.vertexID, this.subtaskIndex, this.operatorName, concatScopes(additionalScope));
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9948d484/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
index 038fd1e..6a6e7aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -128,13 +128,20 @@ public class MetricGroupTest extends TestLogger {
 		TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
 		TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
 		TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5);
-		GenericMetricGroup userGroup = new GenericMetricGroup(registry, task, "hello");
-
-		QueryScopeInfo.TaskQueryScopeInfo info = (QueryScopeInfo.TaskQueryScopeInfo) userGroup.createQueryServiceMetricInfo(new DummyCharacterFilter());
-		assertEquals("hello", info.scope);
-		assertEquals(jid.toString(), info.jobID);
-		assertEquals(vid.toString(), info.vertexID);
-		assertEquals(4, info.subtaskIndex);
+		GenericMetricGroup userGroup1 = new GenericMetricGroup(registry, task, "hello");
+		GenericMetricGroup userGroup2 = new GenericMetricGroup(registry, userGroup1, "world");
+
+		QueryScopeInfo.TaskQueryScopeInfo info1 = (QueryScopeInfo.TaskQueryScopeInfo) userGroup1.createQueryServiceMetricInfo(new DummyCharacterFilter());
+		assertEquals("hello", info1.scope);
+		assertEquals(jid.toString(), info1.jobID);
+		assertEquals(vid.toString(), info1.vertexID);
+		assertEquals(4, info1.subtaskIndex);
+
+		QueryScopeInfo.TaskQueryScopeInfo info2 = (QueryScopeInfo.TaskQueryScopeInfo) userGroup2.createQueryServiceMetricInfo(new DummyCharacterFilter());
+		assertEquals("hello.world", info2.scope);
+		assertEquals(jid.toString(), info2.jobID);
+		assertEquals(vid.toString(), info2.vertexID);
+		assertEquals(4, info2.subtaskIndex);
 	}
 	
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9948d484/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java
new file mode 100644
index 0000000..1ff804a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class QueryScopeInfoTest {
+	@Test
+	public void testJobManagerQueryScopeInfo() {
+		QueryScopeInfo.JobManagerQueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo();
+		assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, info.getCategory());
+		assertEquals("", info.scope);
+		
+		info = info.copy("world");
+		assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, info.getCategory());
+		assertEquals("world", info.scope);
+
+		info = new QueryScopeInfo.JobManagerQueryScopeInfo("hello");
+		assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, info.getCategory());
+		assertEquals("hello", info.scope);
+
+		info = info.copy("world");
+		assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, info.getCategory());
+		assertEquals("hello.world", info.scope);
+	}
+
+	@Test
+	public void testTaskManagerQueryScopeInfo() {
+		QueryScopeInfo.TaskManagerQueryScopeInfo info = new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid");
+		assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, info.getCategory());
+		assertEquals("", info.scope);
+		assertEquals("tmid", info.taskManagerID);
+
+		info = info.copy("world");
+		assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, info.getCategory());
+		assertEquals("world", info.scope);
+		assertEquals("tmid", info.taskManagerID);
+		
+		info = new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "hello");
+		assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, info.getCategory());
+		assertEquals("hello", info.scope);
+		assertEquals("tmid", info.taskManagerID);
+
+		info = info.copy("world");
+		assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, info.getCategory());
+		assertEquals("hello.world", info.scope);
+		assertEquals("tmid", info.taskManagerID);
+	}
+
+	@Test
+	public void testJobQueryScopeInfo() {
+		QueryScopeInfo.JobQueryScopeInfo info = new QueryScopeInfo.JobQueryScopeInfo("jobid");
+		assertEquals(QueryScopeInfo.INFO_CATEGORY_JOB, info.getCategory());
+		assertEquals("", info.scope);
+		assertEquals("jobid", info.jobID);
+
+		info = info.copy("world");
+		assertEquals(QueryScopeInfo.INFO_CATEGORY_JOB, info.getCategory());
+		assertEquals("world", info.scope);
+		assertEquals("jobid", info.jobID);
+
+		info = new QueryScopeInfo.JobQueryScopeInfo("jobid", "hello");
+		assertEquals(QueryScopeInfo.INFO_CATEGORY_JOB, info.getCategory());
+		assertEquals("hello", info.scope);
+		assertEquals("jobid", info.jobID);
+
+		info = info.copy("world");
+		assertEquals(QueryScopeInfo.INFO_CATEGORY_JOB, info.getCategory());
+		assertEquals("hello.world", info.scope);
+		assertEquals("jobid", info.jobID);
+	}
+
+	@Test
+	public void testTaskQueryScopeInfo() {
+		QueryScopeInfo.TaskQueryScopeInfo info = new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2);
+		assertEquals(QueryScopeInfo.INFO_CATEGORY_TASK, info.getCategory());
+		assertEquals("", info.scope);
+		assertEquals("jobid", info.jobID);
+		assertEquals("taskid", info.vertexID);
+		assertEquals(2, info.subtaskIndex);
+
+		info = info.copy("world");
+		assertEquals(QueryScopeInfo.INFO_CATEGORY_TASK, info.getCategory());
+		assertEquals("world", info.scope);
+		assertEquals("jobid", info.jobID);
+		assertEquals("taskid", info.vertexID);
+		assertEquals(2, info.subtaskIndex);
+
+		info = new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2, "hello");
+		assertEquals(QueryScopeInfo.INFO_CATEGORY_TASK, info.getCategory());
+		assertEquals("hello", info.scope);
+		assertEquals("jobid", info.jobID);
+		assertEquals("taskid", info.vertexID);
+		assertEquals(2, info.subtaskIndex);
+
+		info = info.copy("world");
+		assertEquals(QueryScopeInfo.INFO_CATEGORY_TASK, info.getCategory());
+		assertEquals("hello.world", info.scope);
+		assertEquals("jobid", info.jobID);
+		assertEquals("taskid", info.vertexID);
+		assertEquals(2, info.subtaskIndex);
+	}
+
+	@Test
+	public void testOperatorQueryScopeInfo() {
+		QueryScopeInfo.OperatorQueryScopeInfo info = new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 2, "opname");
+		assertEquals(QueryScopeInfo.INFO_CATEGORY_OPERATOR, info.getCategory());
+		assertEquals("", info.scope);
+		assertEquals("jobid", info.jobID);
+		assertEquals("taskid", info.vertexID);
+		assertEquals("opname", info.operatorName);
+		assertEquals(2, info.subtaskIndex);
+
+		info = info.copy("world");
+		assertEquals(QueryScopeInfo.INFO_CATEGORY_OPERATOR, info.getCategory());
+		assertEquals("world", info.scope);
+		assertEquals("jobid", info.jobID);
+		assertEquals("taskid", info.vertexID);
+		assertEquals("opname", info.operatorName);
+		assertEquals(2, info.subtaskIndex);
+
+		info = new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 2, "opname", "hello");
+		assertEquals(QueryScopeInfo.INFO_CATEGORY_OPERATOR, info.getCategory());
+		assertEquals("hello", info.scope);
+		assertEquals("jobid", info.jobID);
+		assertEquals("taskid", info.vertexID);
+		assertEquals("opname", info.operatorName);
+		assertEquals(2, info.subtaskIndex);
+
+		info = info.copy("world");
+		assertEquals(QueryScopeInfo.INFO_CATEGORY_OPERATOR, info.getCategory());
+		assertEquals("hello.world", info.scope);
+		assertEquals("jobid", info.jobID);
+		assertEquals("taskid", info.vertexID);
+		assertEquals("opname", info.operatorName);
+		assertEquals(2, info.subtaskIndex);
+	}
+}