You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by ifndef-SleePy <gi...@git.apache.org> on 2017/02/23 05:53:12 UTC

[GitHub] flink pull request #3395: [FLINK-5861] Components of TaskManager support upd...

GitHub user ifndef-SleePy opened a pull request:

    https://github.com/apache/flink/pull/3395

    [FLINK-5861] Components of TaskManager support updating JobManagerConnection

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/alibaba/flink jira-5861

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3395.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3395
    
----

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3395: [FLINK-5861] Components of TaskManager support upd...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3395#discussion_r104131595
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -1114,6 +1122,12 @@ public void run() {
     		public void updateTaskExecutionState(final TaskExecutionState taskExecutionState) {
     			TaskExecutor.this.updateTaskExecutionState(jobMasterLeaderId, jobMasterGateway, taskExecutionState);
     		}
    +
    +		@Override
    +		public void notifyJobManagerConnectionChanged(JobMasterGateway jobMasterGateway, UUID jobMasterLeaderID) {
    +			this.jobMasterGateway = jobMasterGateway;
    +			this.jobMasterLeaderId = jobMasterLeaderID;
    --- End diff --
    
    What about a `null` check?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3395: [FLINK-5861] Components of TaskManager support updating J...

Posted by ifndef-SleePy <gi...@git.apache.org>.
Github user ifndef-SleePy commented on the issue:

    https://github.com/apache/flink/pull/3395
  
    Hi Till, I almost miss this comments! I didn't see it until a few minutes ago.
    
    I fully understand your concern. Just let me explain more about your comments.
    1. I agree most of your suggestions. Such as null check, formatting problem and TestLogger.
    2. Currently synchronize problem will not happen. I think replacing the field value is safe. That's a atomic operation. Correct me if I'm wrong.
    3. This PR will not work with other reconciliation PRs. Nobody will notify these listeners. Actually we implemented reconnection between TM and JM. It will work with those codes. The reason I make this PR without other reconciliation PRs is that I think this PR is independent with other parts. I believe filing a huge PR is both terrible for reviewer and writer. However this single PR makes you confused. Sorry about that. 
    4. Actually I'm not sure listener pattern is the best way to do this. But I think it's the simplest way which makes least modifications of current implementation. If the TM reconnected with new JM, how can we update the JobMasterGateway handled by components? I can't figure out a better way except reimplementing these components.
    
    Anyway, thank you for reviewing and commenting so many! I agree with you that we should close this PR at this moment. After making an agreement about main reconciliation PRs, we can talk about what this PR try to implement. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3395: [FLINK-5861] Components of TaskManager support upd...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3395#discussion_r104130678
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnectionListener.java ---
    @@ -0,0 +1,37 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.taskexecutor;
    +
    +import org.apache.flink.runtime.jobmaster.JobMasterGateway;
    +
    +import java.util.UUID;
    +
    +/**
    + * Listener that will be notified when {@link JobManagerConnection} changed.
    + */
    +public interface JobManagerConnectionListener {
    +	/**
    +	 * Notify job manager connection changed.
    +	 *
    +	 * @param jobMasterGateway   the job master gateway
    --- End diff --
    
    tab or multiple whitespace indentations after `jobMasterGateway`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3395: [FLINK-5861] Components of TaskManager support upd...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3395#discussion_r104131696
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java ---
    @@ -58,4 +62,9 @@ public void declineCheckpoint(
     
     		checkpointCoordinatorGateway.declineCheckpoint(jobID, executionAttemptID, checkpointId, cause);
     	}
    +
    +	@Override
    +	public void notifyJobManagerConnectionChanged(JobMasterGateway jobMasterGateway, UUID jobMasterLeaderID) {
    +		this.checkpointCoordinatorGateway = jobMasterGateway;
    --- End diff --
    
    `null`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3395: [FLINK-5861] Components of TaskManager support upd...

Posted by ifndef-SleePy <gi...@git.apache.org>.
Github user ifndef-SleePy closed the pull request at:

    https://github.com/apache/flink/pull/3395


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3395: [FLINK-5861] Components of TaskManager support upd...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3395#discussion_r104135197
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -1114,6 +1122,12 @@ public void run() {
     		public void updateTaskExecutionState(final TaskExecutionState taskExecutionState) {
     			TaskExecutor.this.updateTaskExecutionState(jobMasterLeaderId, jobMasterGateway, taskExecutionState);
     		}
    +
    +		@Override
    +		public void notifyJobManagerConnectionChanged(JobMasterGateway jobMasterGateway, UUID jobMasterLeaderID) {
    --- End diff --
    
    This call can happen concurrently. You would have to synchronize the field writes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3395: [FLINK-5861] Components of TaskManager support upd...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3395#discussion_r104132061
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/JobManagerConnectionTest.java ---
    @@ -0,0 +1,82 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.taskexecutor;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
    +import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
    +import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
    +import org.apache.flink.runtime.jobmaster.JobMasterGateway;
    +import org.apache.flink.runtime.taskmanager.CheckpointResponder;
    +import org.apache.flink.runtime.taskmanager.TaskManagerActions;
    +import org.junit.Test;
    +
    +import java.util.UUID;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static org.junit.Assert.*;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.times;
    +import static org.mockito.Mockito.verify;
    +
    +public class JobManagerConnectionTest {
    +
    +	@Test
    +	public void testJobManagerUpdateListener() {
    --- End diff --
    
    It's always good to add a short comment what you're trying to test here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3395: [FLINK-5861] Components of TaskManager support upd...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3395#discussion_r104135016
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java ---
    @@ -98,4 +103,15 @@ public ResultPartitionConsumableNotifier getResultPartitionConsumableNotifier()
     	public PartitionProducerStateChecker getPartitionStateChecker() {
     		return partitionStateChecker;
     	}
    +
    +	public void registerListener(JobManagerConnectionListener listener) {
    +		jobManagerConnectionListeners.add(listener);
    +	}
    +
    +	public void notifyListener(JobManagerConnection newJobManagerConnection) {
    +		for (JobManagerConnectionListener listener: jobManagerConnectionListeners) {
    +			listener.notifyJobManagerConnectionChanged(newJobManagerConnection.getJobManagerGateway(),
    +				newJobManagerConnection.getLeaderId());
    +		}
    +	}
    --- End diff --
    
    Why do you need this listener pattern here? `JobManagerConnection` already holds a reference to all except for the `InputSplitProvider` instances.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3395: [FLINK-5861] Components of TaskManager support upd...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3395#discussion_r104135429
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java ---
    @@ -98,4 +103,15 @@ public ResultPartitionConsumableNotifier getResultPartitionConsumableNotifier()
     	public PartitionProducerStateChecker getPartitionStateChecker() {
     		return partitionStateChecker;
     	}
    +
    +	public void registerListener(JobManagerConnectionListener listener) {
    +		jobManagerConnectionListeners.add(listener);
    +	}
    +
    +	public void notifyListener(JobManagerConnection newJobManagerConnection) {
    --- End diff --
    
    At the moment, if a `JobManager` loses leadership, the `JobManagerConnection` will be closed.  Thus, in case a new `JobManager` reconnects, you won't have the `listeners` anymore to notify them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3395: [FLINK-5861] Components of TaskManager support upd...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3395#discussion_r104130594
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java ---
    @@ -69,6 +73,7 @@ public JobManagerConnection(
     		this.libraryCacheManager = Preconditions.checkNotNull(libraryCacheManager);
     		this.resultPartitionConsumableNotifier = Preconditions.checkNotNull(resultPartitionConsumableNotifier);
     		this.partitionStateChecker = Preconditions.checkNotNull(partitionStateChecker);
    +		this.jobManagerConnectionListeners = new LinkedList<>();
    --- End diff --
    
    Let's use an `ArrayList` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3395: [FLINK-5861] Components of TaskManager support upd...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3395#discussion_r104132101
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/JobManagerConnectionTest.java ---
    @@ -0,0 +1,82 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.taskexecutor;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
    +import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
    +import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
    +import org.apache.flink.runtime.jobmaster.JobMasterGateway;
    +import org.apache.flink.runtime.taskmanager.CheckpointResponder;
    +import org.apache.flink.runtime.taskmanager.TaskManagerActions;
    +import org.junit.Test;
    +
    +import java.util.UUID;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static org.junit.Assert.*;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.times;
    +import static org.mockito.Mockito.verify;
    +
    +public class JobManagerConnectionTest {
    --- End diff --
    
    All test classes should extend the `TestLogger` class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---