You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/16 21:50:01 UTC
[flink] 03/03: [hotfix] Remove Mockito from MetricFetcherTest
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7d98940408d4d55cc40ef3622386036cf76d78e7
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jan 10 18:09:19 2019 +0100
[hotfix] Remove Mockito from MetricFetcherTest
---
.../handler/legacy/metrics/MetricFetcherTest.java | 60 +++++++----------
.../TestingMetricQueryServiceGateway.java | 76 ++++++++++++++++++++++
2 files changed, 99 insertions(+), 37 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index b478893..16ed248 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -30,8 +30,6 @@ import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.util.TestHistogram;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
@@ -41,39 +39,29 @@ import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.TestingMetricQueryServiceGateway;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
import javax.annotation.Nonnull;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
/**
* Tests for the MetricFetcher.
*/
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(MetricFetcher.class)
public class MetricFetcherTest extends TestLogger {
@Test
- public void testUpdate() throws Exception {
+ public void testUpdate() {
final Time timeout = Time.seconds(10L);
// ========= setup TaskManager =================================================================================
@@ -81,39 +69,37 @@ public class MetricFetcherTest extends TestLogger {
ResourceID tmRID = ResourceID.generate();
// ========= setup JobManager ==================================================================================
- JobDetails details = mock(JobDetails.class);
- when(details.getJobId()).thenReturn(jobID);
final String jmMetricQueryServicePath = "/jm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
final String tmMetricQueryServicePath = "/tm/" + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + tmRID.getResourceIdString();
- JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
-
- when(jobManagerGateway.requestMultipleJobDetails(any(Time.class)))
- .thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList())));
- when(jobManagerGateway.requestMetricQueryServicePaths(any(Time.class))).thenReturn(
- CompletableFuture.completedFuture(Collections.singleton(jmMetricQueryServicePath)));
- when(jobManagerGateway.requestTaskManagerMetricQueryServicePaths(any(Time.class))).thenReturn(
- CompletableFuture.completedFuture(Collections.singleton(Tuple2.of(tmRID, tmMetricQueryServicePath))));
+ final TestingRestfulGateway restfulGateway = new TestingRestfulGateway.Builder()
+ .setRequestMultipleJobDetailsSupplier(() -> CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList())))
+ .setRequestMetricQueryServicePathsSupplier(() -> CompletableFuture.completedFuture(Collections.singleton(jmMetricQueryServicePath)))
+ .setRequestTaskManagerMetricQueryServicePathsSupplier(() -> CompletableFuture.completedFuture(Collections.singleton(Tuple2.of(tmRID, tmMetricQueryServicePath))))
+ .build();
- GatewayRetriever<JobManagerGateway> retriever = mock(AkkaJobManagerRetriever.class);
- when(retriever.getNow())
- .thenReturn(Optional.of(jobManagerGateway));
+ final GatewayRetriever<RestfulGateway> retriever = () -> CompletableFuture.completedFuture(restfulGateway);
// ========= setup QueryServices ================================================================================
- MetricQueryServiceGateway jmQueryService = mock(MetricQueryServiceGateway.class);
- MetricQueryServiceGateway tmQueryService = mock(MetricQueryServiceGateway.class);
+ final MetricQueryServiceGateway jmQueryService = new TestingMetricQueryServiceGateway.Builder()
+ .setQueryMetricsSupplier(() -> CompletableFuture.completedFuture(new MetricDumpSerialization.MetricSerializationResult(new byte[0], new byte[0], new byte[0], new byte[0], 0, 0, 0, 0)))
+ .build();
MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer = createRequestDumpAnswer(tmRID, jobID);
+ final MetricQueryServiceGateway tmQueryService = new TestingMetricQueryServiceGateway.Builder()
+ .setQueryMetricsSupplier(() -> CompletableFuture.completedFuture(requestMetricsAnswer))
+ .build();
- when(jmQueryService.queryMetrics(any(Time.class)))
- .thenReturn(CompletableFuture.completedFuture(new MetricDumpSerialization.MetricSerializationResult(new byte[0], new byte[0], new byte[0], new byte[0], 0, 0, 0, 0)));
- when(tmQueryService.queryMetrics(any(Time.class)))
- .thenReturn(CompletableFuture.completedFuture(requestMetricsAnswer));
-
- MetricQueryServiceRetriever queryServiceRetriever = mock(MetricQueryServiceRetriever.class);
- when(queryServiceRetriever.retrieveService(eq(jmMetricQueryServicePath))).thenReturn(CompletableFuture.completedFuture(jmQueryService));
- when(queryServiceRetriever.retrieveService(eq(tmMetricQueryServicePath))).thenReturn(CompletableFuture.completedFuture(tmQueryService));
+ final MetricQueryServiceRetriever queryServiceRetriever = (path) -> {
+ if (path.equals(jmMetricQueryServicePath)) {
+ return CompletableFuture.completedFuture(jmQueryService);
+ } else if (path.equals(tmMetricQueryServicePath)) {
+ return CompletableFuture.completedFuture(tmQueryService);
+ } else {
+ throw new IllegalArgumentException("Unexpected argument.");
+ }
+ };
// ========= start MetricFetcher testing =======================================================================
MetricFetcher fetcher = new MetricFetcherImpl<>(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/TestingMetricQueryServiceGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/TestingMetricQueryServiceGateway.java
new file mode 100644
index 0000000..0ef1921
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/TestingMetricQueryServiceGateway.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
+/**
+ * Testing implementation of the {@link MetricQueryServiceGateway}.
+ */
+public class TestingMetricQueryServiceGateway implements MetricQueryServiceGateway {
+
+ @Nonnull
+ private final Supplier<CompletableFuture<MetricDumpSerialization.MetricSerializationResult>> queryMetricsSupplier;
+
+ @Nonnull
+ private final String address;
+
+ public TestingMetricQueryServiceGateway(@Nonnull Supplier<CompletableFuture<MetricDumpSerialization.MetricSerializationResult>> queryMetricsSupplier, @Nonnull String address) {
+ this.queryMetricsSupplier = queryMetricsSupplier;
+ this.address = address;
+ }
+
+ @Override
+ public CompletableFuture<MetricDumpSerialization.MetricSerializationResult> queryMetrics(Time timeout) {
+ return queryMetricsSupplier.get();
+ }
+
+ @Override
+ public String getAddress() {
+ return address;
+ }
+
+ /**
+ * Builder for the {@link TestingMetricQueryServiceGateway}.
+ */
+ public static class Builder {
+ private Supplier<CompletableFuture<MetricDumpSerialization.MetricSerializationResult>> queryMetricsSupplier = CompletableFuture::new;
+ private String address = "localhost";
+
+ public Builder setQueryMetricsSupplier(Supplier<CompletableFuture<MetricDumpSerialization.MetricSerializationResult>> queryMetricsSupplier) {
+ this.queryMetricsSupplier = queryMetricsSupplier;
+ return this;
+ }
+
+ public Builder setAddress(String address) {
+ this.address = address;
+ return this;
+ }
+
+ public TestingMetricQueryServiceGateway build() {
+ return new TestingMetricQueryServiceGateway(queryMetricsSupplier, address);
+ }
+ }
+}