You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/16 21:50:01 UTC

[flink] 03/03: [hotfix] Remove Mockito from MetricFetcherTest

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7d98940408d4d55cc40ef3622386036cf76d78e7
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jan 10 18:09:19 2019 +0100

    [hotfix] Remove Mockito from MetricFetcherTest
---
 .../handler/legacy/metrics/MetricFetcherTest.java  | 60 +++++++----------
 .../TestingMetricQueryServiceGateway.java          | 76 ++++++++++++++++++++++
 2 files changed, 99 insertions(+), 37 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index b478893..16ed248 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -30,8 +30,6 @@ import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.util.TestHistogram;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
@@ -41,39 +39,29 @@ import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.TestingMetricQueryServiceGateway;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import javax.annotation.Nonnull;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
 
 /**
  * Tests for the MetricFetcher.
  */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(MetricFetcher.class)
 public class MetricFetcherTest extends TestLogger {
 	@Test
-	public void testUpdate() throws Exception {
+	public void testUpdate() {
 		final Time timeout = Time.seconds(10L);
 
 		// ========= setup TaskManager =================================================================================
@@ -81,39 +69,37 @@ public class MetricFetcherTest extends TestLogger {
 		ResourceID tmRID = ResourceID.generate();
 
 		// ========= setup JobManager ==================================================================================
-		JobDetails details = mock(JobDetails.class);
-		when(details.getJobId()).thenReturn(jobID);
 
 		final String jmMetricQueryServicePath = "/jm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
 		final String tmMetricQueryServicePath = "/tm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + tmRID.getResourceIdString();
 
-		JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
-
-		when(jobManagerGateway.requestMultipleJobDetails(any(Time.class)))
-			.thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList())));
-		when(jobManagerGateway.requestMetricQueryServicePaths(any(Time.class))).thenReturn(
-			CompletableFuture.completedFuture(Collections.singleton(jmMetricQueryServicePath)));
-		when(jobManagerGateway.requestTaskManagerMetricQueryServicePaths(any(Time.class))).thenReturn(
-			CompletableFuture.completedFuture(Collections.singleton(Tuple2.of(tmRID, tmMetricQueryServicePath))));
+		final TestingRestfulGateway restfulGateway = new TestingRestfulGateway.Builder()
+			.setRequestMultipleJobDetailsSupplier(() -> CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList())))
+			.setRequestMetricQueryServicePathsSupplier(() -> CompletableFuture.completedFuture(Collections.singleton(jmMetricQueryServicePath)))
+			.setRequestTaskManagerMetricQueryServicePathsSupplier(() -> CompletableFuture.completedFuture(Collections.singleton(Tuple2.of(tmRID, tmMetricQueryServicePath))))
+			.build();
 
-		GatewayRetriever<JobManagerGateway> retriever = mock(AkkaJobManagerRetriever.class);
-		when(retriever.getNow())
-			.thenReturn(Optional.of(jobManagerGateway));
+		final GatewayRetriever<RestfulGateway> retriever = () -> CompletableFuture.completedFuture(restfulGateway);
 
 		// ========= setup QueryServices ================================================================================
-		MetricQueryServiceGateway jmQueryService = mock(MetricQueryServiceGateway.class);
-		MetricQueryServiceGateway tmQueryService = mock(MetricQueryServiceGateway.class);
+		final MetricQueryServiceGateway jmQueryService = new TestingMetricQueryServiceGateway.Builder()
+			.setQueryMetricsSupplier(() -> CompletableFuture.completedFuture(new MetricDumpSerialization.MetricSerializationResult(new byte[0], new byte[0], new byte[0], new byte[0], 0, 0, 0, 0)))
+			.build();
 
 		MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer = createRequestDumpAnswer(tmRID, jobID);
+		final MetricQueryServiceGateway tmQueryService = new TestingMetricQueryServiceGateway.Builder()
+			.setQueryMetricsSupplier(() -> CompletableFuture.completedFuture(requestMetricsAnswer))
+			.build();
 
-		when(jmQueryService.queryMetrics(any(Time.class)))
-			.thenReturn(CompletableFuture.completedFuture(new MetricDumpSerialization.MetricSerializationResult(new byte[0], new byte[0], new byte[0], new byte[0], 0, 0, 0, 0)));
-		when(tmQueryService.queryMetrics(any(Time.class)))
-			.thenReturn(CompletableFuture.completedFuture(requestMetricsAnswer));
-
-		MetricQueryServiceRetriever queryServiceRetriever = mock(MetricQueryServiceRetriever.class);
-		when(queryServiceRetriever.retrieveService(eq(jmMetricQueryServicePath))).thenReturn(CompletableFuture.completedFuture(jmQueryService));
-		when(queryServiceRetriever.retrieveService(eq(tmMetricQueryServicePath))).thenReturn(CompletableFuture.completedFuture(tmQueryService));
+		final MetricQueryServiceRetriever queryServiceRetriever = (path) -> {
+			if (path.equals(jmMetricQueryServicePath))  {
+				return CompletableFuture.completedFuture(jmQueryService);
+			} else if (path.equals(tmMetricQueryServicePath)) {
+				return CompletableFuture.completedFuture(tmQueryService);
+			} else {
+				throw new IllegalArgumentException("Unexpected argument.");
+			}
+		};
 
 		// ========= start MetricFetcher testing =======================================================================
 		MetricFetcher fetcher = new MetricFetcherImpl<>(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/TestingMetricQueryServiceGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/TestingMetricQueryServiceGateway.java
new file mode 100644
index 0000000..0ef1921
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/TestingMetricQueryServiceGateway.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
+/**
+ * Testing implementation of the {@link MetricQueryServiceGateway}.
+ */
+public class TestingMetricQueryServiceGateway implements MetricQueryServiceGateway {
+
+	@Nonnull
+	private final Supplier<CompletableFuture<MetricDumpSerialization.MetricSerializationResult>> queryMetricsSupplier;
+
+	@Nonnull
+	private final String address;
+
+	public TestingMetricQueryServiceGateway(@Nonnull Supplier<CompletableFuture<MetricDumpSerialization.MetricSerializationResult>> queryMetricsSupplier, @Nonnull String address) {
+		this.queryMetricsSupplier = queryMetricsSupplier;
+		this.address = address;
+	}
+
+	@Override
+	public CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(Time timeout) {
+		return queryMetricsSupplier.get();
+	}
+
+	@Override
+	public String getAddress() {
+		return address;
+	}
+
+	/**
+	 * Builder for the {@link TestingMetricQueryServiceGateway}.
+	 */
+	public static class Builder {
+		private Supplier<CompletableFuture<MetricDumpSerialization.MetricSerializationResult>> queryMetricsSupplier = CompletableFuture::new;
+		private String address = "localhost";
+
+		public Builder setQueryMetricsSupplier(Supplier<CompletableFuture<MetricDumpSerialization.MetricSerializationResult>> queryMetricsSupplier) {
+			this.queryMetricsSupplier = queryMetricsSupplier;
+			return this;
+		}
+
+		public Builder setAddress(String address) {
+			this.address = address;
+			return this;
+		}
+
+		public TestingMetricQueryServiceGateway build() {
+			return new TestingMetricQueryServiceGateway(queryMetricsSupplier, address);
+		}
+	}
+}