You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2018/04/03 08:56:02 UTC

[GitHub] flink pull request #5697: [FLINK-8704][tests] Port ScheduleOrUpdateConsumers...

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5697#discussion_r178754924
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java ---
    @@ -0,0 +1,168 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmanager;
    +
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
    +import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
    +import org.apache.flink.runtime.jobgraph.DistributionPattern;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
    +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
    +import org.apache.flink.runtime.testingUtils.TestingCluster;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
    +
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +
    +import java.util.List;
    +
    +import static org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY;
    +
    +public class LegacyScheduleOrUpdateConsumersTest extends TestLogger {
    +
    +	private static final int NUMBER_OF_TMS = 2;
    +	private static final int NUMBER_OF_SLOTS_PER_TM = 2;
    +	private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
    +
    +	private static TestingCluster flink;
    +
    +	@BeforeClass
    +	public static void setUp() throws Exception {
    +		flink = TestingUtils.startTestingCluster(
    +				NUMBER_OF_SLOTS_PER_TM,
    +				NUMBER_OF_TMS,
    +				TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
    +	}
    +
    +	@AfterClass
    +	public static void tearDown() throws Exception {
    +		flink.stop();
    +	}
    +
    +	/**
    +	 * Tests notifications of multiple receivers when a task produces both a pipelined and blocking
    +	 * result.
    +	 *
    +	 * <pre>
    +	 *                             +----------+
    +	 *            +-- pipelined -> | Receiver |
    +	 * +--------+ |                +----------+
    +	 * | Sender |-|
    +	 * +--------+ |                +----------+
    +	 *            +-- blocking --> | Receiver |
    +	 *                             +----------+
    +	 * </pre>
    +	 *
    +	 * The pipelined receiver gets deployed after the first buffer is available and the blocking
    +	 * one after all subtasks are finished.
    +	 */
    +	@Test
    +	public void testMixedPipelinedAndBlockingResults() throws Exception {
    +		final JobVertex sender = new JobVertex("Sender");
    +		sender.setInvokableClass(BinaryRoundRobinSubtaskIndexSender.class);
    +		sender.getConfiguration().setInteger(BinaryRoundRobinSubtaskIndexSender.CONFIG_KEY, PARALLELISM);
    +		sender.setParallelism(PARALLELISM);
    +
    +		final JobVertex pipelinedReceiver = new JobVertex("Pipelined Receiver");
    +		pipelinedReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
    +		pipelinedReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM);
    +		pipelinedReceiver.setParallelism(PARALLELISM);
    +
    +		pipelinedReceiver.connectNewDataSetAsInput(
    +				sender,
    +				DistributionPattern.ALL_TO_ALL,
    +				ResultPartitionType.PIPELINED);
    +
    +		final JobVertex blockingReceiver = new JobVertex("Blocking Receiver");
    +		blockingReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
    +		blockingReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM);
    +		blockingReceiver.setParallelism(PARALLELISM);
    +
    +		blockingReceiver.connectNewDataSetAsInput(sender,
    +				DistributionPattern.ALL_TO_ALL,
    +				ResultPartitionType.BLOCKING);
    +
    +		SlotSharingGroup slotSharingGroup = new SlotSharingGroup(
    +				sender.getID(), pipelinedReceiver.getID(), blockingReceiver.getID());
    +
    +		sender.setSlotSharingGroup(slotSharingGroup);
    +		pipelinedReceiver.setSlotSharingGroup(slotSharingGroup);
    +		blockingReceiver.setSlotSharingGroup(slotSharingGroup);
    +
    +		final JobGraph jobGraph = new JobGraph(
    +				"Mixed pipelined and blocking result",
    +				sender,
    +				pipelinedReceiver,
    +				blockingReceiver);
    --- End diff --
    
    AFAIK there is no dependency.
    
    It's just that given that the duality of these tests is limited to 1.5 I wouldn't in general take extra steps to re-use code. We would either have to create abstract base classes or move parts of the tests into public/package-private methods, which then we have to revert again in 1.6 (at least in part).
    
    I'm questioning whether that is worth it, besides the added risk of accidentally modifying the behavior of the old test.


---