You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2018/03/14 08:49:16 UTC

[GitHub] flink pull request #5697: [FLINK-8704][tests] Port ScheduleOrUpdateConsumers...

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/5697

    [FLINK-8704][tests] Port ScheduleOrUpdateConsumersTest

    ## What is the purpose of the change
    
    This PR ports the `ScheduleOrUpdateConsumersTest` to flip6. The existing test was renamed to `LegacyScheduleOrUpdateConsumersTest`, and a ported copy was added.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 8704_schedule

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5697.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5697
    
----
commit d7f42bf65177bb57f8159f7866397f4e55c5d9f0
Author: zentol <ch...@...>
Date:   2018-03-12T12:21:01Z

    [FLINK-8704][tests] Port ScheduleOrUpdateConsumersTest

----


---

[GitHub] flink pull request #5697: [FLINK-8704][tests] Port ScheduleOrUpdateConsumers...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5697#discussion_r178756067
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java ---
    @@ -0,0 +1,168 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmanager;
    +
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
    +import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
    +import org.apache.flink.runtime.jobgraph.DistributionPattern;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
    +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
    +import org.apache.flink.runtime.testingUtils.TestingCluster;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
    +
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +
    +import java.util.List;
    +
    +import static org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY;
    +
    +public class LegacyScheduleOrUpdateConsumersTest extends TestLogger {
    +
    +	private static final int NUMBER_OF_TMS = 2;
    +	private static final int NUMBER_OF_SLOTS_PER_TM = 2;
    +	private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
    +
    +	private static TestingCluster flink;
    +
    +	@BeforeClass
    +	public static void setUp() throws Exception {
    +		flink = TestingUtils.startTestingCluster(
    +				NUMBER_OF_SLOTS_PER_TM,
    +				NUMBER_OF_TMS,
    +				TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
    +	}
    +
    +	@AfterClass
    +	public static void tearDown() throws Exception {
    +		flink.stop();
    +	}
    +
    +	/**
    +	 * Tests notifications of multiple receivers when a task produces both a pipelined and blocking
    +	 * result.
    +	 *
    +	 * <pre>
    +	 *                             +----------+
    +	 *            +-- pipelined -> | Receiver |
    +	 * +--------+ |                +----------+
    +	 * | Sender |-|
    +	 * +--------+ |                +----------+
    +	 *            +-- blocking --> | Receiver |
    +	 *                             +----------+
    +	 * </pre>
    +	 *
    +	 * The pipelined receiver gets deployed after the first buffer is available and the blocking
    +	 * one after all subtasks are finished.
    +	 */
    +	@Test
    +	public void testMixedPipelinedAndBlockingResults() throws Exception {
    +		final JobVertex sender = new JobVertex("Sender");
    +		sender.setInvokableClass(BinaryRoundRobinSubtaskIndexSender.class);
    +		sender.getConfiguration().setInteger(BinaryRoundRobinSubtaskIndexSender.CONFIG_KEY, PARALLELISM);
    +		sender.setParallelism(PARALLELISM);
    +
    +		final JobVertex pipelinedReceiver = new JobVertex("Pipelined Receiver");
    +		pipelinedReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
    +		pipelinedReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM);
    +		pipelinedReceiver.setParallelism(PARALLELISM);
    +
    +		pipelinedReceiver.connectNewDataSetAsInput(
    +				sender,
    +				DistributionPattern.ALL_TO_ALL,
    +				ResultPartitionType.PIPELINED);
    +
    +		final JobVertex blockingReceiver = new JobVertex("Blocking Receiver");
    +		blockingReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
    +		blockingReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM);
    +		blockingReceiver.setParallelism(PARALLELISM);
    +
    +		blockingReceiver.connectNewDataSetAsInput(sender,
    +				DistributionPattern.ALL_TO_ALL,
    +				ResultPartitionType.BLOCKING);
    +
    +		SlotSharingGroup slotSharingGroup = new SlotSharingGroup(
    +				sender.getID(), pipelinedReceiver.getID(), blockingReceiver.getID());
    +
    +		sender.setSlotSharingGroup(slotSharingGroup);
    +		pipelinedReceiver.setSlotSharingGroup(slotSharingGroup);
    +		blockingReceiver.setSlotSharingGroup(slotSharingGroup);
    +
    +		final JobGraph jobGraph = new JobGraph(
    +				"Mixed pipelined and blocking result",
    +				sender,
    +				pipelinedReceiver,
    +				blockingReceiver);
    --- End diff --
    
    Alright, if there is no easy way, then let's avoid the extra work and concentrate on quickly removing it afterwards.


---

[GitHub] flink pull request #5697: [FLINK-8704][tests] Port ScheduleOrUpdateConsumers...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5697#discussion_r177795751
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java ---
    @@ -0,0 +1,168 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmanager;
    +
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
    +import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
    +import org.apache.flink.runtime.jobgraph.DistributionPattern;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
    +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
    +import org.apache.flink.runtime.testingUtils.TestingCluster;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
    +
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +
    +import java.util.List;
    +
    +import static org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY;
    +
    +public class LegacyScheduleOrUpdateConsumersTest extends TestLogger {
    +
    +	private static final int NUMBER_OF_TMS = 2;
    +	private static final int NUMBER_OF_SLOTS_PER_TM = 2;
    +	private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
    +
    +	private static TestingCluster flink;
    +
    +	@BeforeClass
    +	public static void setUp() throws Exception {
    +		flink = TestingUtils.startTestingCluster(
    +				NUMBER_OF_SLOTS_PER_TM,
    +				NUMBER_OF_TMS,
    +				TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
    +	}
    +
    +	@AfterClass
    +	public static void tearDown() throws Exception {
    +		flink.stop();
    +	}
    +
    +	/**
    +	 * Tests notifications of multiple receivers when a task produces both a pipelined and blocking
    +	 * result.
    +	 *
    +	 * <pre>
    +	 *                             +----------+
    +	 *            +-- pipelined -> | Receiver |
    +	 * +--------+ |                +----------+
    +	 * | Sender |-|
    +	 * +--------+ |                +----------+
    +	 *            +-- blocking --> | Receiver |
    +	 *                             +----------+
    +	 * </pre>
    +	 *
    +	 * The pipelined receiver gets deployed after the first buffer is available and the blocking
    +	 * one after all subtasks are finished.
    +	 */
    +	@Test
    +	public void testMixedPipelinedAndBlockingResults() throws Exception {
    +		final JobVertex sender = new JobVertex("Sender");
    +		sender.setInvokableClass(BinaryRoundRobinSubtaskIndexSender.class);
    +		sender.getConfiguration().setInteger(BinaryRoundRobinSubtaskIndexSender.CONFIG_KEY, PARALLELISM);
    +		sender.setParallelism(PARALLELISM);
    +
    +		final JobVertex pipelinedReceiver = new JobVertex("Pipelined Receiver");
    +		pipelinedReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
    +		pipelinedReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM);
    +		pipelinedReceiver.setParallelism(PARALLELISM);
    +
    +		pipelinedReceiver.connectNewDataSetAsInput(
    +				sender,
    +				DistributionPattern.ALL_TO_ALL,
    +				ResultPartitionType.PIPELINED);
    +
    +		final JobVertex blockingReceiver = new JobVertex("Blocking Receiver");
    +		blockingReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
    +		blockingReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM);
    +		blockingReceiver.setParallelism(PARALLELISM);
    +
    +		blockingReceiver.connectNewDataSetAsInput(sender,
    +				DistributionPattern.ALL_TO_ALL,
    +				ResultPartitionType.BLOCKING);
    +
    +		SlotSharingGroup slotSharingGroup = new SlotSharingGroup(
    +				sender.getID(), pipelinedReceiver.getID(), blockingReceiver.getID());
    +
    +		sender.setSlotSharingGroup(slotSharingGroup);
    +		pipelinedReceiver.setSlotSharingGroup(slotSharingGroup);
    +		blockingReceiver.setSlotSharingGroup(slotSharingGroup);
    +
    +		final JobGraph jobGraph = new JobGraph(
    +				"Mixed pipelined and blocking result",
    +				sender,
    +				pipelinedReceiver,
    +				blockingReceiver);
    --- End diff --
    
    Could we deduplicate this code?


---

[GitHub] flink pull request #5697: [FLINK-8704][tests] Port ScheduleOrUpdateConsumers...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5697#discussion_r178754924
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java ---
    @@ -0,0 +1,168 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmanager;
    +
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
    +import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
    +import org.apache.flink.runtime.jobgraph.DistributionPattern;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
    +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
    +import org.apache.flink.runtime.testingUtils.TestingCluster;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
    +
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +
    +import java.util.List;
    +
    +import static org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY;
    +
    +public class LegacyScheduleOrUpdateConsumersTest extends TestLogger {
    +
    +	private static final int NUMBER_OF_TMS = 2;
    +	private static final int NUMBER_OF_SLOTS_PER_TM = 2;
    +	private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
    +
    +	private static TestingCluster flink;
    +
    +	@BeforeClass
    +	public static void setUp() throws Exception {
    +		flink = TestingUtils.startTestingCluster(
    +				NUMBER_OF_SLOTS_PER_TM,
    +				NUMBER_OF_TMS,
    +				TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
    +	}
    +
    +	@AfterClass
    +	public static void tearDown() throws Exception {
    +		flink.stop();
    +	}
    +
    +	/**
    +	 * Tests notifications of multiple receivers when a task produces both a pipelined and blocking
    +	 * result.
    +	 *
    +	 * <pre>
    +	 *                             +----------+
    +	 *            +-- pipelined -> | Receiver |
    +	 * +--------+ |                +----------+
    +	 * | Sender |-|
    +	 * +--------+ |                +----------+
    +	 *            +-- blocking --> | Receiver |
    +	 *                             +----------+
    +	 * </pre>
    +	 *
    +	 * The pipelined receiver gets deployed after the first buffer is available and the blocking
    +	 * one after all subtasks are finished.
    +	 */
    +	@Test
    +	public void testMixedPipelinedAndBlockingResults() throws Exception {
    +		final JobVertex sender = new JobVertex("Sender");
    +		sender.setInvokableClass(BinaryRoundRobinSubtaskIndexSender.class);
    +		sender.getConfiguration().setInteger(BinaryRoundRobinSubtaskIndexSender.CONFIG_KEY, PARALLELISM);
    +		sender.setParallelism(PARALLELISM);
    +
    +		final JobVertex pipelinedReceiver = new JobVertex("Pipelined Receiver");
    +		pipelinedReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
    +		pipelinedReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM);
    +		pipelinedReceiver.setParallelism(PARALLELISM);
    +
    +		pipelinedReceiver.connectNewDataSetAsInput(
    +				sender,
    +				DistributionPattern.ALL_TO_ALL,
    +				ResultPartitionType.PIPELINED);
    +
    +		final JobVertex blockingReceiver = new JobVertex("Blocking Receiver");
    +		blockingReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
    +		blockingReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM);
    +		blockingReceiver.setParallelism(PARALLELISM);
    +
    +		blockingReceiver.connectNewDataSetAsInput(sender,
    +				DistributionPattern.ALL_TO_ALL,
    +				ResultPartitionType.BLOCKING);
    +
    +		SlotSharingGroup slotSharingGroup = new SlotSharingGroup(
    +				sender.getID(), pipelinedReceiver.getID(), blockingReceiver.getID());
    +
    +		sender.setSlotSharingGroup(slotSharingGroup);
    +		pipelinedReceiver.setSlotSharingGroup(slotSharingGroup);
    +		blockingReceiver.setSlotSharingGroup(slotSharingGroup);
    +
    +		final JobGraph jobGraph = new JobGraph(
    +				"Mixed pipelined and blocking result",
    +				sender,
    +				pipelinedReceiver,
    +				blockingReceiver);
    --- End diff --
    
    AFAIK there is no dependency.
    
    It's just that given that the duality of these tests is limited to 1.5 I wouldn't in general take extra steps to re-use code. We would either have to create abstract base classes or move parts of the tests into public/package-private methods, which then we have to revert again in 1.6 (at least in part).
    
    I'm questioning whether that is worth it, besides the added risk of accidentally modifying the behavior of the old test.


---

[GitHub] flink pull request #5697: [FLINK-8704][tests] Port ScheduleOrUpdateConsumers...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5697


---

[GitHub] flink pull request #5697: [FLINK-8704][tests] Port ScheduleOrUpdateConsumers...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5697#discussion_r177804076
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java ---
    @@ -0,0 +1,168 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmanager;
    +
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
    +import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
    +import org.apache.flink.runtime.jobgraph.DistributionPattern;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
    +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
    +import org.apache.flink.runtime.testingUtils.TestingCluster;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
    +
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +
    +import java.util.List;
    +
    +import static org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY;
    +
    +public class LegacyScheduleOrUpdateConsumersTest extends TestLogger {
    +
    +	private static final int NUMBER_OF_TMS = 2;
    +	private static final int NUMBER_OF_SLOTS_PER_TM = 2;
    +	private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
    +
    +	private static TestingCluster flink;
    +
    +	@BeforeClass
    +	public static void setUp() throws Exception {
    +		flink = TestingUtils.startTestingCluster(
    +				NUMBER_OF_SLOTS_PER_TM,
    +				NUMBER_OF_TMS,
    +				TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
    +	}
    +
    +	@AfterClass
    +	public static void tearDown() throws Exception {
    +		flink.stop();
    +	}
    +
    +	/**
    +	 * Tests notifications of multiple receivers when a task produces both a pipelined and blocking
    +	 * result.
    +	 *
    +	 * <pre>
    +	 *                             +----------+
    +	 *            +-- pipelined -> | Receiver |
    +	 * +--------+ |                +----------+
    +	 * | Sender |-|
    +	 * +--------+ |                +----------+
    +	 *            +-- blocking --> | Receiver |
    +	 *                             +----------+
    +	 * </pre>
    +	 *
    +	 * The pipelined receiver gets deployed after the first buffer is available and the blocking
    +	 * one after all subtasks are finished.
    +	 */
    +	@Test
    +	public void testMixedPipelinedAndBlockingResults() throws Exception {
    +		final JobVertex sender = new JobVertex("Sender");
    +		sender.setInvokableClass(BinaryRoundRobinSubtaskIndexSender.class);
    +		sender.getConfiguration().setInteger(BinaryRoundRobinSubtaskIndexSender.CONFIG_KEY, PARALLELISM);
    +		sender.setParallelism(PARALLELISM);
    +
    +		final JobVertex pipelinedReceiver = new JobVertex("Pipelined Receiver");
    +		pipelinedReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
    +		pipelinedReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM);
    +		pipelinedReceiver.setParallelism(PARALLELISM);
    +
    +		pipelinedReceiver.connectNewDataSetAsInput(
    +				sender,
    +				DistributionPattern.ALL_TO_ALL,
    +				ResultPartitionType.PIPELINED);
    +
    +		final JobVertex blockingReceiver = new JobVertex("Blocking Receiver");
    +		blockingReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
    +		blockingReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM);
    +		blockingReceiver.setParallelism(PARALLELISM);
    +
    +		blockingReceiver.connectNewDataSetAsInput(sender,
    +				DistributionPattern.ALL_TO_ALL,
    +				ResultPartitionType.BLOCKING);
    +
    +		SlotSharingGroup slotSharingGroup = new SlotSharingGroup(
    +				sender.getID(), pipelinedReceiver.getID(), blockingReceiver.getID());
    +
    +		sender.setSlotSharingGroup(slotSharingGroup);
    +		pipelinedReceiver.setSlotSharingGroup(slotSharingGroup);
    +		blockingReceiver.setSlotSharingGroup(slotSharingGroup);
    +
    +		final JobGraph jobGraph = new JobGraph(
    +				"Mixed pipelined and blocking result",
    +				sender,
    +				pipelinedReceiver,
    +				blockingReceiver);
    --- End diff --
    
    Yes, or is there a dependency on a static lock?


---

[GitHub] flink pull request #5697: [FLINK-8704][tests] Port ScheduleOrUpdateConsumers...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5697#discussion_r177801606
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/LegacyScheduleOrUpdateConsumersTest.java ---
    @@ -0,0 +1,168 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmanager;
    +
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
    +import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
    +import org.apache.flink.runtime.jobgraph.DistributionPattern;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
    +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
    +import org.apache.flink.runtime.testingUtils.TestingCluster;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
    +
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +
    +import java.util.List;
    +
    +import static org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY;
    +
    +public class LegacyScheduleOrUpdateConsumersTest extends TestLogger {
    +
    +	private static final int NUMBER_OF_TMS = 2;
    +	private static final int NUMBER_OF_SLOTS_PER_TM = 2;
    +	private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
    +
    +	private static TestingCluster flink;
    +
    +	@BeforeClass
    +	public static void setUp() throws Exception {
    +		flink = TestingUtils.startTestingCluster(
    +				NUMBER_OF_SLOTS_PER_TM,
    +				NUMBER_OF_TMS,
    +				TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
    +	}
    +
    +	@AfterClass
    +	public static void tearDown() throws Exception {
    +		flink.stop();
    +	}
    +
    +	/**
    +	 * Tests notifications of multiple receivers when a task produces both a pipelined and blocking
    +	 * result.
    +	 *
    +	 * <pre>
    +	 *                             +----------+
    +	 *            +-- pipelined -> | Receiver |
    +	 * +--------+ |                +----------+
    +	 * | Sender |-|
    +	 * +--------+ |                +----------+
    +	 *            +-- blocking --> | Receiver |
    +	 *                             +----------+
    +	 * </pre>
    +	 *
    +	 * The pipelined receiver gets deployed after the first buffer is available and the blocking
    +	 * one after all subtasks are finished.
    +	 */
    +	@Test
    +	public void testMixedPipelinedAndBlockingResults() throws Exception {
    +		final JobVertex sender = new JobVertex("Sender");
    +		sender.setInvokableClass(BinaryRoundRobinSubtaskIndexSender.class);
    +		sender.getConfiguration().setInteger(BinaryRoundRobinSubtaskIndexSender.CONFIG_KEY, PARALLELISM);
    +		sender.setParallelism(PARALLELISM);
    +
    +		final JobVertex pipelinedReceiver = new JobVertex("Pipelined Receiver");
    +		pipelinedReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
    +		pipelinedReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM);
    +		pipelinedReceiver.setParallelism(PARALLELISM);
    +
    +		pipelinedReceiver.connectNewDataSetAsInput(
    +				sender,
    +				DistributionPattern.ALL_TO_ALL,
    +				ResultPartitionType.PIPELINED);
    +
    +		final JobVertex blockingReceiver = new JobVertex("Blocking Receiver");
    +		blockingReceiver.setInvokableClass(SlotCountExceedingParallelismTest.SubtaskIndexReceiver.class);
    +		blockingReceiver.getConfiguration().setInteger(CONFIG_KEY, PARALLELISM);
    +		blockingReceiver.setParallelism(PARALLELISM);
    +
    +		blockingReceiver.connectNewDataSetAsInput(sender,
    +				DistributionPattern.ALL_TO_ALL,
    +				ResultPartitionType.BLOCKING);
    +
    +		SlotSharingGroup slotSharingGroup = new SlotSharingGroup(
    +				sender.getID(), pipelinedReceiver.getID(), blockingReceiver.getID());
    +
    +		sender.setSlotSharingGroup(slotSharingGroup);
    +		pipelinedReceiver.setSlotSharingGroup(slotSharingGroup);
    +		blockingReceiver.setSlotSharingGroup(slotSharingGroup);
    +
    +		final JobGraph jobGraph = new JobGraph(
    +				"Mixed pipelined and blocking result",
    +				sender,
    +				pipelinedReceiver,
    +				blockingReceiver);
    --- End diff --
    
    You mean sharing it across legacy/new versions?


---

[GitHub] flink issue #5697: [FLINK-8704][tests] Port ScheduleOrUpdateConsumersTest

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/5697
  
    merging.


---