You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by NicoK <gi...@git.apache.org> on 2017/10/02 15:13:08 UTC

[GitHub] flink pull request #4761: [FLINK-7748][network] properly use the TaskEventDi...

GitHub user NicoK opened a pull request:

    https://github.com/apache/flink/pull/4761

    [FLINK-7748][network] properly use the TaskEventDispatcher for subscribing to events

    ## What is the purpose of the change
    
    `ResultPartitionWriter` currently implements the `EventListener` interface and is used for event registration, although event publishing is already handled via the `TaskEventDispatcher`.
    Instead of using two different places, this should be unified by using `TaskEventDispatcher` only which is done by this PR.
    
    Please note that this PR builds upon #4759.
    
    ## Brief change log
    
    - make `TaskEventDispatcher` more generic to register result partitions via `ResultPartitionID` with one `TaskEventHandler` per partition (handled by `TaskEventDispatcher`, not inside `ResultPartitionWriter`)
    - remove the `EventListener<TaskEvent>` implementation from `ResultPartitionWriter`
    - add the `TaskEventDispatcher` to the `Environment` information for a task to be able to work with it (only `IterationHeadTask` is using this as of now)
    - adapt all places to use `TaskEventDispatcher` instead of `ResultPartitionWriter`
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
    - a new `TaskEventDispatcherTest` verifies that `TaskEventDispatcher` works as expected
    - indirectly, tests under `org.apache.flink.runtime.iterative` may verify the use of the `IterationHeadTask` which is the only user so far
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (JavaDocs)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/NicoK/flink flink-7748

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4761.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4761
    
----
commit d0cf483e0fb21010ed996935ffb14d62b34ee8ed
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-29T15:32:52Z

    [FLINK-7746][network] move ResultPartitionWriter#writeBufferToAllChannels implementation up into ResultPartition

commit cfea094aa214612eaafef14554bc68626a6ff948
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-29T16:24:00Z

    [FLINK-7748][network] properly use the TaskEventDispatcher for subscribing to events
    
    Previously, the ResultPartitionWriter implemented the EventListener interface
    and was used for event registration, although event publishing was already
    handled via the TaskEventDispatcher. Now, we use the TaskEventDispatcher for
    both, event registration and publishing.
    
    It also adds the TaskEventDispatcher to the Environment information for a task
    to be able to work with it (only IterationHeadTask so far).

----


---

[GitHub] flink pull request #4761: [FLINK-7748][network] properly use the TaskEventDi...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4761#discussion_r154080335
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java ---
    @@ -190,4 +191,8 @@ public InputGate getInputGate(int index) {
     		return null;
     	}
     
    +	@Override
    +	public TaskEventDispatcher getTaskEventDispatcher() {
    +		return null;
    --- End diff --
    
    ok, let's try this out


---

[GitHub] flink issue #4761: [FLINK-7748][network] properly use the TaskEventDispatche...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4761
  
    merging.


---

[GitHub] flink pull request #4761: [FLINK-7748][network] properly use the TaskEventDi...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4761#discussion_r154027358
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ---
    @@ -325,9 +325,10 @@ public void testRequestBackoffConfiguration() throws Exception {
     		int initialBackoff = 137;
     		int maxBackoff = 1001;
     
    +		TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
     		NetworkEnvironment netEnv = mock(NetworkEnvironment.class);
     		when(netEnv.getResultPartitionManager()).thenReturn(new ResultPartitionManager());
    -		when(netEnv.getTaskEventDispatcher()).thenReturn(new TaskEventDispatcher());
    +		when(netEnv.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
    --- End diff --
    
    what is the purpose of this change?


---

[GitHub] flink pull request #4761: [FLINK-7748][network] properly use the TaskEventDi...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4761#discussion_r154027456
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java ---
    @@ -190,4 +191,8 @@ public InputGate getInputGate(int index) {
     		return null;
     	}
     
    +	@Override
    +	public TaskEventDispatcher getTaskEventDispatcher() {
    +		return null;
    --- End diff --
    
    `throw new UnsuportedOperationException()` ?


---

[GitHub] flink pull request #4761: [FLINK-7748][network] properly use the TaskEventDi...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4761#discussion_r154026050
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TaskEventDispatcherTest.java ---
    @@ -0,0 +1,168 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.io.network;
    +
    +import org.apache.flink.runtime.event.TaskEvent;
    +import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
    +import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
    +import org.apache.flink.runtime.iterative.event.TerminationEvent;
    +import org.apache.flink.runtime.util.event.EventListener;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.ExpectedException;
    +
    +import static junit.framework.TestCase.assertTrue;
    +import static org.junit.Assert.assertFalse;
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.times;
    +import static org.mockito.Mockito.verify;
    +
    +/**
    + * Basic tests for {@link TaskEventDispatcher}.
    + */
    +public class TaskEventDispatcherTest extends TestLogger {
    +
    +	@Rule
    +	public ExpectedException expectedException = ExpectedException.none();
    +
    +	@Test
    +	public void registerPartitionTwice() throws Exception {
    +		ResultPartitionID partitionId = new ResultPartitionID();
    +		TaskEventDispatcher ted = new TaskEventDispatcher();
    +		ted.registerPartition(partitionId);
    +
    +		expectedException.expect(IllegalStateException.class);
    +		expectedException.expectMessage("already registered at task event dispatcher");
    +
    +		ted.registerPartition(partitionId);
    +	}
    +
    +	@Test
    +	public void subscribeToEventNotRegistered() throws Exception {
    +		TaskEventDispatcher ted = new TaskEventDispatcher();
    +
    +		expectedException.expect(IllegalStateException.class);
    +		expectedException.expectMessage("not registered at task event dispatcher");
    +
    +		//noinspection unchecked
    +		ted.subscribeToEvent(new ResultPartitionID(), mock(EventListener.class), TaskEvent.class);
    +	}
    +
    +	/**
    +	 * Tests {@link TaskEventDispatcher#publish(ResultPartitionID, TaskEvent)} and {@link TaskEventDispatcher#subscribeToEvent(ResultPartitionID, EventListener, Class)} methods.
    +	 */
    +	@Test
    +	public void publishSubscribe() throws Exception {
    +		ResultPartitionID partitionId1 = new ResultPartitionID();
    +		ResultPartitionID partitionId2 = new ResultPartitionID();
    +		TaskEventDispatcher ted = new TaskEventDispatcher();
    +
    +		AllWorkersDoneEvent event1 = new AllWorkersDoneEvent();
    +		assertFalse(ted.publish(partitionId1, event1));
    +
    +		ted.registerPartition(partitionId1);
    +		ted.registerPartition(partitionId2);
    +
    +		// no event listener subscribed yet, but the event is forwarded to a TaskEventHandler
    +		assertTrue(ted.publish(partitionId1, event1));
    +
    +		//noinspection unchecked
    +		EventListener<TaskEvent> eventListener1a = mock(EventListener.class);
    --- End diff --
    
    ```
     = new OneShotEventListener(event1);
     = new ZeroShotEventListener();
     ...
    ```


---

[GitHub] flink issue #4761: [FLINK-7748][network] properly use the TaskEventDispatche...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4761
  
    Could you resolve conflicts before I start reviewing?


---

[GitHub] flink pull request #4761: [FLINK-7748][network] properly use the TaskEventDi...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4761#discussion_r154021028
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java ---
    @@ -223,8 +225,10 @@ private void readInitialSolutionSet(JoinHashMap<X> solutionSet, MutableObjectIte
     
     	private SuperstepBarrier initSuperstepBarrier() {
     		SuperstepBarrier barrier = new SuperstepBarrier(getUserCodeClassLoader());
    -		this.toSync.subscribeToEvent(barrier, AllWorkersDoneEvent.class);
    -		this.toSync.subscribeToEvent(barrier, TerminationEvent.class);
    +		TaskEventDispatcher taskEventDispatcher = getEnvironment().getTaskEventDispatcher();
    +		ResultPartitionID partitionId = this.toSync.getPartitionId();
    --- End diff --
    
    nit: remove `this`


---

[GitHub] flink issue #4761: [FLINK-7748][network] properly use the TaskEventDispatche...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on the issue:

    https://github.com/apache/flink/pull/4761
  
    alright - didn't even need to do anything here since git auto-merged them :)


---

[GitHub] flink pull request #4761: [FLINK-7748][network] properly use the TaskEventDi...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4761#discussion_r154023982
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
    @@ -671,7 +671,7 @@ else if (current == ExecutionState.CANCELING) {
     				jobConfiguration, taskConfiguration, userCodeClassLoader,
     				memoryManager, ioManager, broadcastVariableManager,
     				accumulatorRegistry, kvStateRegistry, inputSplitProvider,
    -				distributedCacheEntries, writers, inputGates,
    +				distributedCacheEntries, writers, inputGates, network.getTaskEventDispatcher(),
    --- End diff --
    
    wow. Could you reformat this one with one parameter per line?


---

[GitHub] flink pull request #4761: [FLINK-7748][network] properly use the TaskEventDi...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4761#discussion_r154025597
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TaskEventDispatcherTest.java ---
    @@ -0,0 +1,168 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.io.network;
    +
    +import org.apache.flink.runtime.event.TaskEvent;
    +import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
    +import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
    +import org.apache.flink.runtime.iterative.event.TerminationEvent;
    +import org.apache.flink.runtime.util.event.EventListener;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.ExpectedException;
    +
    +import static junit.framework.TestCase.assertTrue;
    +import static org.junit.Assert.assertFalse;
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.times;
    +import static org.mockito.Mockito.verify;
    +
    +/**
    + * Basic tests for {@link TaskEventDispatcher}.
    + */
    +public class TaskEventDispatcherTest extends TestLogger {
    +
    +	@Rule
    +	public ExpectedException expectedException = ExpectedException.none();
    +
    +	@Test
    +	public void registerPartitionTwice() throws Exception {
    +		ResultPartitionID partitionId = new ResultPartitionID();
    +		TaskEventDispatcher ted = new TaskEventDispatcher();
    +		ted.registerPartition(partitionId);
    +
    +		expectedException.expect(IllegalStateException.class);
    +		expectedException.expectMessage("already registered at task event dispatcher");
    +
    +		ted.registerPartition(partitionId);
    +	}
    +
    +	@Test
    +	public void subscribeToEventNotRegistered() throws Exception {
    +		TaskEventDispatcher ted = new TaskEventDispatcher();
    +
    +		expectedException.expect(IllegalStateException.class);
    +		expectedException.expectMessage("not registered at task event dispatcher");
    +
    +		//noinspection unchecked
    +		ted.subscribeToEvent(new ResultPartitionID(), mock(EventListener.class), TaskEvent.class);
    +	}
    +
    +	/**
    +	 * Tests {@link TaskEventDispatcher#publish(ResultPartitionID, TaskEvent)} and {@link TaskEventDispatcher#subscribeToEvent(ResultPartitionID, EventListener, Class)} methods.
    +	 */
    +	@Test
    --- End diff --
    
    Guess what? :)
    
    ```
    private static OneShotEventListener extends EventListener<TaskEvent> {
      private final TaskEvent expected;
      private boolean fired = false;
      public OneShotEventListener(TaskEvent expected) {
        ...
      }
      
      public void onEvent(TaskEvent actual) {
         checkState(!fired);
         fired = true;
         checkArgument(actual == expected);
      }
    }
    
    private static ZeroShotEventListener extends EventListener<TaskEvent> {
      public void onEvent(TaskEvent actual) {
         throw new IllegalStateException();
      }
    }
    ```


---

[GitHub] flink pull request #4761: [FLINK-7748][network] properly use the TaskEventDi...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4761#discussion_r154067761
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java ---
    @@ -34,12 +31,10 @@
      * The {@link ResultPartitionWriter} is the runtime API for producing results. It
      * supports two kinds of data to be sent: buffers and events.
      */
    -public class ResultPartitionWriter implements EventListener<TaskEvent> {
    --- End diff --
    
    a quick search reveals `NetworkEnvironment.java` calling `taskEventDispatcher.registerWriterForIncomingTaskEvents(writer.getPartitionId(), writer);` and therefore `TaskEventDispatcher` itself but there, the `EventListener` interface was actually only used in `TaskEventDispatcher#publish()` which is somewhat strange - agreed


---

[GitHub] flink pull request #4761: [FLINK-7748][network] properly use the TaskEventDi...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4761


---

[GitHub] flink pull request #4761: [FLINK-7748][network] properly use the TaskEventDi...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4761#discussion_r154023066
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java ---
    @@ -34,12 +31,10 @@
      * The {@link ResultPartitionWriter} is the runtime API for producing results. It
      * supports two kinds of data to be sent: buffers and events.
      */
    -public class ResultPartitionWriter implements EventListener<TaskEvent> {
    --- End diff --
    
    I do not understand why this class was previously implementing `EventListener`. I couldn't find any context in which it was used that way.


---