You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zjureel <gi...@git.apache.org> on 2017/07/20 07:09:45 UTC

[GitHub] flink pull request #4376: [FLINK-6521] Add per job cleanup methods to HighAv...

GitHub user zjureel opened a pull request:

    https://github.com/apache/flink/pull/4376

    [FLINK-6521] Add per job cleanup methods to HighAvailabilityServices

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zjureel/flink FLINK-6521

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4376.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4376
    
----
commit 0290f34f82cf5eba6bfabbb785037cf1507600b4
Author: zjureel <zj...@gmail.com>
Date:   2017-07-12T05:33:08Z

    add cleanupData(JobID jobID) in HighAvailabilityServices

commit 7e9860e526bb33fbca6ffbb1822d9a6d945ceb6f
Author: zjureel <zj...@gmail.com>
Date:   2017-07-19T06:32:10Z

    add test case

commit e3fd500cae0e85c5f3c12f728ccde8ec95729020
Author: zjureel <zj...@gmail.com>
Date:   2017-07-20T07:05:29Z

    add test case

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4376: [FLINK-6521] Add per job cleanup methods to HighAv...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4376#discussion_r130052278
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/HighAvailabilityServiceTest.java ---
    @@ -0,0 +1,115 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.highavailability.zookeeper;
    +
    +import akka.actor.ActorRef;
    +import org.apache.curator.test.TestingServer;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.runtime.akka.ListeningBehaviour;
    +import org.apache.flink.runtime.blob.FileSystemBlobStore;
    +import org.apache.flink.runtime.concurrent.Executors;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobmanager.JobInfo;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
    +import org.apache.flink.runtime.util.ZooKeeperUtils;
    +
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +
    +import org.junit.Assert;
    +import org.junit.rules.ExpectedException;
    +
    +public class HighAvailabilityServiceTest {
    --- End diff --
    
    If we let the test case extend from `TestLogger`, then we get nice testing log statement on Travis.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4376: [FLINK-6521] Add per job cleanup methods to HighAv...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4376#discussion_r130052071
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java ---
    @@ -113,6 +116,11 @@ public ZooKeeperHaServices(
     		this.runningJobsRegistry = new ZooKeeperRunningJobsRegistry(client, configuration);
     
     		this.blobStoreService = checkNotNull(blobStoreService);
    +		try {
    +			this.submittedJobGraphStore = ZooKeeperUtils.createSubmittedJobGraphs(client, configuration, executor);
    +		} catch (Exception e) {
    +			throw new RuntimeException(e);
    --- End diff --
    
    We should not throw `RuntimeException` but instead a meaningful checked exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4376: [FLINK-6521] Add per job cleanup methods to HighAvailabil...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4376
  
    The test case `JobManagerHACheckpointRecoveryITCase.testCheckpointedStreamingProgramIncrementalRocksDB` seems to be failing on Travis. It might be something caused by the changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4376: [FLINK-6521] Add per job cleanup methods to HighAvailabil...

Posted by zjureel <gi...@git.apache.org>.
Github user zjureel commented on the issue:

    https://github.com/apache/flink/pull/4376
  
    I found the following kinda stuff from CI, and it seems not relevant to this issue, what do you think? @tillrohrmann 
    ```
    Running org.apache.flink.test.recovery.JobManagerHACheckpointRecoveryITCase
    java.lang.RuntimeException: org.apache.zookeeper.server.ZooKeeperServer class is frozen
    	at javassist.CtClassType.checkModify(CtClassType.java:288)
    	at javassist.CtBehavior.setBody(CtBehavior.java:432)
    	at javassist.CtBehavior.setBody(CtBehavior.java:412)
    	at org.apache.curator.test.ByteCodeRewrite.fixMethods(ByteCodeRewrite.java:91)
    	at org.apache.curator.test.ByteCodeRewrite.<clinit>(ByteCodeRewrite.java:50)
    	at org.apache.curator.test.TestingServer.<clinit>(TestingServer.java:33)
    	at org.apache.flink.test.recovery.JobManagerHACheckpointRecoveryITCase.testCheckpointedStreamingProgram(JobManagerHACheckpointRecoveryITCase.java:350)
    	at org.apache.flink.test.recovery.JobManagerHACheckpointRecoveryITCase.testCheckpointedStreamingProgramIncrementalRocksDB(JobManagerHACheckpointRecoveryITCase.java:336)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4376: [FLINK-6521] Add per job cleanup methods to HighAv...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4376#discussion_r130052646
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/HighAvailabilityServiceTest.java ---
    @@ -0,0 +1,115 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.highavailability.zookeeper;
    +
    +import akka.actor.ActorRef;
    +import org.apache.curator.test.TestingServer;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.runtime.akka.ListeningBehaviour;
    +import org.apache.flink.runtime.blob.FileSystemBlobStore;
    +import org.apache.flink.runtime.concurrent.Executors;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobmanager.JobInfo;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
    +import org.apache.flink.runtime.util.ZooKeeperUtils;
    +
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +
    +import org.junit.Assert;
    +import org.junit.rules.ExpectedException;
    +
    +public class HighAvailabilityServiceTest {
    +	private TestingServer testingServer;
    +	private HighAvailabilityServices zkHaService;
    +	private SubmittedJobGraphStore submittedJobGraphStore;
    +
    +	@Rule
    +	public ExpectedException thrown = ExpectedException.none();
    +
    +	@Before
    +	public void before() throws Exception {
    +		testingServer = new TestingServer();
    +
    +		Configuration configuration = new Configuration();
    +		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
    +		configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
    +		configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, "temp");
    +		zkHaService = new ZooKeeperHaServices(
    +						ZooKeeperUtils.startCuratorFramework(configuration),
    +						Executors.directExecutor(),
    +						configuration,
    +						new FileSystemBlobStore(FileSystem.getLocalFileSystem(), "local"));
    +
    +		submittedJobGraphStore = zkHaService.getSubmittedJobGraphStore();
    +		submittedJobGraphStore.start(new SubmittedJobGraphStore.SubmittedJobGraphListener() {
    +			@Override
    +			public void onAddedJobGraph(JobID jobId) {
    +
    +			}
    +
    +			@Override
    +			public void onRemovedJobGraph(JobID jobId) {
    +
    +			}
    +		});
    +	}
    +
    +	@After
    +	public void after() throws Exception {
    +		testingServer.stop();
    +		testingServer = null;
    +
    +		submittedJobGraphStore.stop();
    +		submittedJobGraphStore = null;
    +
    +		zkHaService.closeAndCleanupAllData();
    +		zkHaService = null;
    +	}
    +
    +	/**
    +	 * Tests for that the function of cleanupData(JobID) in SubmittedJobGraph
    +	 */
    +	@Test
    +	public void testCleanSubmittedJobGraphStore() throws Exception {
    +		SubmittedJobGraph jobGraph1 = new SubmittedJobGraph(
    +						new JobGraph("testSubmittedJob1"),
    +						new JobInfo(ActorRef.noSender(), ListeningBehaviour.DETACHED, 0, Integer.MAX_VALUE));
    +		SubmittedJobGraph jobGraph2 = new SubmittedJobGraph(
    +						new JobGraph("testSubmittedJob2"),
    +						new JobInfo(ActorRef.noSender(), ListeningBehaviour.DETACHED, 0, Integer.MAX_VALUE));
    +		submittedJobGraphStore.putJobGraph(jobGraph1);
    +		submittedJobGraphStore.putJobGraph(jobGraph2);
    +
    +		zkHaService.cleanupData(jobGraph1.getJobId());
    +
    +		SubmittedJobGraph recoverJobGraph2 = submittedJobGraphStore.recoverJobGraph(jobGraph2.getJobId());
    +		Assert.assertEquals(recoverJobGraph2.getJobId(), jobGraph2.getJobId());
    +		thrown.expectMessage("Could not retrieve the submitted job graph state handle for /" +
    --- End diff --
    
    Could we rather check for the exception type? Matching exception messages is really brittle.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---