You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by yanghua <gi...@git.apache.org> on 2018/02/24 07:03:41 UTC

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

GitHub user yanghua opened a pull request:

    https://github.com/apache/flink/pull/5573

    [FLINK-8756][Client] Support ClusterClient.getAccumulators() in RestClusterClient

    ## What is the purpose of the change
    
    This PR Support ClusterClient.getAccumulators() in RestClusterClient.
    
    ## Brief change log
    
      - *Send REST request to get `JobAccumulatorsInfo` object*
      - *Use jackson's ObjectMapper convert `JobAccumulatorsInfo` object to Map*
      - *Add a test method into `RestClusterClientTest` class to test the `getAccumulators` function*
      - *Add a test handler to mock `JobAccumulatorsInfo` object*
    
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
      - *Added test that validates that actual accumulators size equals we mocked in the test handler*
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/yanghua/flink FLINK-8756

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5573.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5573
    
----
commit ec8ef5d8ad6d650e250737d5005173994337168c
Author: vinoyang <vi...@...>
Date:   2018-02-24T06:50:55Z

    [FLINK-8756][Client] Support ClusterClient.getAccumulators() in RestClusterClient

----


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r172650031
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java ---
    @@ -33,19 +39,38 @@
     public class JobAccumulatorsInfo implements ResponseBody {
     	public static final String FIELD_NAME_JOB_ACCUMULATORS = "job-accumulators";
     	public static final String FIELD_NAME_USER_TASK_ACCUMULATORS = "user-task-accumulators";
    +	public static final String FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS = "serialized-user-task-accumulators";
     
     	@JsonProperty(FIELD_NAME_JOB_ACCUMULATORS)
     	private List<JobAccumulator> jobAccumulators;
     
     	@JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS)
     	private List<UserTaskAccumulator> userAccumulators;
     
    +	@JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS)
    +	@JsonSerialize(contentUsing = SerializedValueSerializer.class)
    +	private Map<String, SerializedValue<Object>> serializedUserAccumulators;
    +
     	@JsonCreator
     	public JobAccumulatorsInfo(
     			@JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) List<JobAccumulator> jobAccumulators,
    -			@JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) List<UserTaskAccumulator> userAccumulators) {
    +			@JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) List<UserTaskAccumulator> userAccumulators,
    +			@JsonDeserialize(contentUsing = SerializedValueDeserializer.class) @JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS) Map<String, SerializedValue<Object>> serializedUserAccumulators) {
     		this.jobAccumulators = Preconditions.checkNotNull(jobAccumulators);
     		this.userAccumulators = Preconditions.checkNotNull(userAccumulators);
    +		this.serializedUserAccumulators = Preconditions.checkNotNull(serializedUserAccumulators);
    +	}
    +
    +	public List<JobAccumulator> getJobAccumulators() {
    --- End diff --
    
    missing `@JsonIgnore` annotations


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    I agree with @aljoscha. We could change the `JobAccumulatorsHandler` to return the serialized representation of the accumulator map instead of the stringified representation. We would then also have to adapt the web ui to work with this format.


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r175733976
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---
    @@ -389,6 +394,36 @@ public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirecto
     		});
     	}
     
    +	@Override
    +	public Map<String, Object> getAccumulators(JobID jobID) throws Exception {
    +		return getAccumulators(jobID, ClassLoader.getSystemClassLoader());
    +	}
    +
    +	@Override
    +	public Map<String, Object> getAccumulators(final JobID jobID, ClassLoader loader) throws Exception {
    +		final JobAccumulatorsHeaders accumulatorsHeaders = JobAccumulatorsHeaders.getInstance();
    +		final JobAccumulatorsMessageParameters accMsgParams = accumulatorsHeaders.getUnresolvedMessageParameters();
    +		accMsgParams.jobPathParameter.resolve(jobID);
    +		accMsgParams.includeSerializedAccumulatorsParameter.resolve(Collections.singletonList(true));
    +
    +		CompletableFuture<JobAccumulatorsInfo> responseFuture = sendRequest(
    +			accumulatorsHeaders,
    +			accMsgParams
    +		);
    +
    +		return responseFuture.thenApply((JobAccumulatorsInfo accumulatorsInfo) -> {
    +			if (accumulatorsInfo != null && accumulatorsInfo.getSerializedUserAccumulators() != null) {
    +				try {
    +					return AccumulatorHelper.deserializeAccumulators(accumulatorsInfo.getSerializedUserAccumulators(), loader);
    +				} catch (Exception e) {
    +					log.error("Deserialize accumulators with customized classloader error : {}", e);
    +				}
    +			}
    +
    +			return Collections.EMPTY_MAP;
    --- End diff --
    
    please replace with `Collections.<String, Object>emptyMap()`


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    @tillrohrmann , if you have time could you please review this PR? Thanks.


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    @aljoscha and @tillrohrmann , Thanks for your suggestion. I will rework for this. 
    Moreover, @tillrohrmann would please review my another PR [FLINK-8459](https://github.com/apache/flink/pull/5565)? Thanks!


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r172648627
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsQueryParameter.java ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages;
    +
    +/**
    + * query parameter for job's accumulator handler {@link org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}.
    + */
    +public class JobAccumulatorsQueryParameter extends MessageQueryParameter<String> {
    --- End diff --
    
    Why not directly make this a boolean option?


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r175735579
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsMessageParameters.java ---
    @@ -0,0 +1,36 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +
    +/**
    + * request parameter for job accumulator's handler {@link org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}.
    --- End diff --
    
    Capital letter


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r175736041
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializer.java ---
    @@ -36,6 +38,10 @@
     
     	private static final long serialVersionUID = 1L;
     
    +	public SerializedValueSerializer() {
    +		super(TypeFactory.defaultInstance().constructType(new TypeReference<SerializedValue<Object>>() {}));
    --- End diff --
    
    Same here with the default constructor.


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    We may want to delay merging this until [FLINK-8881](https://issues.apache.org/jira/browse/FLINK-8881) has been addressed, as it voids the primary use-case of this handler.


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    @GJL , the before discussion between I and @aljoscha treat the accumulators as a whole (maybe only me?). Because the pre-existing implementation `getAccumulatorResultsStringified ` and `getAccumulatorsSerialized ` , so I took steps along this path and split the stringified and serialized accumulators.
    
    So the conclusion is : accepting your opinion and taking the each key's SerializedValue into `UserTaskAccumulator` ? 
    
    If yes, I will try to change the code.


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r175745203
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java ---
    @@ -34,6 +36,10 @@
     
     	private static final long serialVersionUID = 1L;
     
    +	public SerializedValueDeserializer() {
    --- End diff --
    
    this is because the `SerializedValueDeserializer ` would be used in a jackson annotation `@JsonDeserialize`


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r172651895
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---
    @@ -389,6 +394,33 @@ public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirecto
     		});
     	}
     
    +	@Override
    +	public Map<String, Object> getAccumulators(final JobID jobID) throws Exception {
    +		final JobAccumulatorsHeaders accumulatorsHeaders = JobAccumulatorsHeaders.getInstance();
    +		final JobAccumulatorsMessageParameters accMsgParams = accumulatorsHeaders.getUnresolvedMessageParameters();
    +		accMsgParams.jobPathParameter.resolve(jobID);
    +		accMsgParams.queryParameter.resolve(Collections.singletonList("true"));
    +
    +		CompletableFuture<JobAccumulatorsInfo> responseFuture = sendRequest(
    +			accumulatorsHeaders,
    +			accMsgParams
    +		);
    +
    +		return responseFuture.thenApply((JobAccumulatorsInfo accumulatorsInfo) -> {
    +			if (accumulatorsInfo != null) {
    +				Map<String, Object> result = new HashMap<>(3);
    +
    +				result.put(JobAccumulatorsInfo.FIELD_NAME_JOB_ACCUMULATORS, accumulatorsInfo.getJobAccumulators());
    --- End diff --
    
    The resulting API (as shown in `RestClusterClientTest`) is effectively not usable for users and inconsistent with existing behavior in `ClusterClient`.
    
    The returned map should only contain the deserialized accumulators with their respective name as the key.


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    @yanghua I will take a look this week.


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r175735166
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java ---
    @@ -62,18 +64,33 @@ public JobAccumulatorsHandler(
     	}
     
     	@Override
    -	protected JobAccumulatorsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph graph) throws RestHandlerException {
    -		StringifiedAccumulatorResult[] accs = graph.getAccumulatorResultsStringified();
    -		List<JobAccumulatorsInfo.UserTaskAccumulator> userTaskAccumulators = new ArrayList<>(accs.length);
    +	protected JobAccumulatorsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobAccumulatorsMessageParameters> request, AccessExecutionGraph graph) throws RestHandlerException {
    +		JobAccumulatorsInfo accumulatorsInfo;
    +		List<Boolean> queryParams = request.getQueryParameter(AccumulatorsIncludeSerializedValueQueryParameter.class);
     
    -		for (StringifiedAccumulatorResult acc : accs) {
    +		boolean includeSerializedValue = false;
    --- End diff --
    
    let's make it `final` and assign `false` in the else branch.


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r175735270
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/AccumulatorsIncludeSerializedValueQueryParameter.java ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages;
    +
    +/**
    + * query parameter for job's accumulator handler {@link org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}.
    --- End diff --
    
    Please start with a capital letter.


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r175734630
  
    --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java ---
    @@ -542,6 +549,68 @@ public void testListJobs() throws Exception {
     		}
     	}
     
    +	@Test
    +	public void testGetAccumulators() throws Exception {
    +		TestAccumulatorHandlers accumulatorHandlers = new TestAccumulatorHandlers();
    +		TestAccumulatorHandlers.TestAccumulatorHandler accumulatorHandler = accumulatorHandlers.new TestAccumulatorHandler();
    +
    +		try (TestRestServerEndpoint ignored = createRestServerEndpoint(accumulatorHandler)){
    +
    +			JobID id = new JobID();
    +
    +			{
    +				Map<String, Object> accumulators = restClusterClient.getAccumulators(id);
    +				assertNotNull(accumulators);
    +				assertEquals(1, accumulators.size());
    +
    +				assertEquals(true, accumulators.containsKey("testKey"));
    +				assertEquals("testValue", accumulators.get("testKey").toString());
    +			}
    +		}
    +	}
    +
    +	private class TestAccumulatorHandlers  {
    +
    +		private class TestAccumulatorHandler extends TestHandler<EmptyRequestBody, JobAccumulatorsInfo, JobAccumulatorsMessageParameters> {
    +
    +			public TestAccumulatorHandler() {
    +				super(JobAccumulatorsHeaders.getInstance());
    +			}
    +
    +			@Override
    +			protected CompletableFuture<JobAccumulatorsInfo> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody,
    +				JobAccumulatorsMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
    +				JobAccumulatorsInfo accumulatorsInfo;
    +				List<Boolean> queryParams = request.getQueryParameter(AccumulatorsIncludeSerializedValueQueryParameter.class);
    +
    +				boolean includeSerializedValue = false;
    --- End diff --
    
    let's make it final and assign false in the else branch


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    @zentol it seems the Travis CI has some problem, always build failed. Please review my latest change.


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    HI @tillrohrmann and @aljoscha , who would review this PR? Thanks!


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    @yanghua This test failure is unrelated, you can ignore it.


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r175736246
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---
    @@ -389,6 +394,36 @@ public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirecto
     		});
     	}
     
    +	@Override
    +	public Map<String, Object> getAccumulators(JobID jobID) throws Exception {
    +		return getAccumulators(jobID, ClassLoader.getSystemClassLoader());
    +	}
    +
    +	@Override
    +	public Map<String, Object> getAccumulators(final JobID jobID, ClassLoader loader) throws Exception {
    +		final JobAccumulatorsHeaders accumulatorsHeaders = JobAccumulatorsHeaders.getInstance();
    +		final JobAccumulatorsMessageParameters accMsgParams = accumulatorsHeaders.getUnresolvedMessageParameters();
    +		accMsgParams.jobPathParameter.resolve(jobID);
    +		accMsgParams.includeSerializedAccumulatorsParameter.resolve(Collections.singletonList(true));
    +
    +		CompletableFuture<JobAccumulatorsInfo> responseFuture = sendRequest(
    +			accumulatorsHeaders,
    +			accMsgParams
    +		);
    +
    +		return responseFuture.thenApply((JobAccumulatorsInfo accumulatorsInfo) -> {
    +			if (accumulatorsInfo != null && accumulatorsInfo.getSerializedUserAccumulators() != null) {
    --- End diff --
    
    `accumulatorsInfo` should always be null. The same applies to `getSerializedUserAccumulators`. Thus there is no need for a null check.


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    I think it would be good to be able to query both representations. The stringified version can be good if you want to do a quick query using a rest client. And we don't have to change the Web UI if we keep both versions.


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r172755830
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---
    @@ -389,6 +393,27 @@ public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirecto
     		});
     	}
     
    +	@Override
    +	public Map<String, Object> getAccumulators(final JobID jobID) throws Exception {
    --- End diff --
    
    we should also override `Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader)`


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    @yanghua I'll have a look next week. From a first look it seems like we might have to extend the rest handler and the client to also return the accumulators as a `SerializedValue<>` as the old code for accumulators does, to allow user defined accumulator types.


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    @yanghua I mean having two separate ways of querying: the server should have two endpoints (or differentiate based on a query parameter) and return either the stringified accumulators or the serialized accumulators. For now, the former would be used for the web frontend while the latter would be used by the `RestClusterClient`. @GJL maybe has a better opinion on how this should be done on the server side, though.


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r175734525
  
    --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java ---
    @@ -542,6 +549,68 @@ public void testListJobs() throws Exception {
     		}
     	}
     
    +	@Test
    +	public void testGetAccumulators() throws Exception {
    +		TestAccumulatorHandlers accumulatorHandlers = new TestAccumulatorHandlers();
    +		TestAccumulatorHandlers.TestAccumulatorHandler accumulatorHandler = accumulatorHandlers.new TestAccumulatorHandler();
    +
    +		try (TestRestServerEndpoint ignored = createRestServerEndpoint(accumulatorHandler)){
    +
    +			JobID id = new JobID();
    +
    +			{
    +				Map<String, Object> accumulators = restClusterClient.getAccumulators(id);
    +				assertNotNull(accumulators);
    +				assertEquals(1, accumulators.size());
    +
    +				assertEquals(true, accumulators.containsKey("testKey"));
    +				assertEquals("testValue", accumulators.get("testKey").toString());
    +			}
    +		}
    +	}
    +
    +	private class TestAccumulatorHandlers  {
    +
    +		private class TestAccumulatorHandler extends TestHandler<EmptyRequestBody, JobAccumulatorsInfo, JobAccumulatorsMessageParameters> {
    +
    +			public TestAccumulatorHandler() {
    +				super(JobAccumulatorsHeaders.getInstance());
    +			}
    +
    +			@Override
    +			protected CompletableFuture<JobAccumulatorsInfo> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody,
    +				JobAccumulatorsMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
    --- End diff --
    
    parameter breaking not consistent


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r170448311
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---
    @@ -363,6 +367,27 @@ public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirecto
     		});
     	}
     
    +	@Override
    +	public Map<String, Object> getAccumulators(final JobID jobID) throws Exception {
    +		final JobAccumulatorsHeaders accumulatorsHeaders = JobAccumulatorsHeaders.getInstance();
    +		final JobMessageParameters  params = new JobMessageParameters();
    +		params.jobPathParameter.resolve(jobID);
    +
    +		CompletableFuture<JobAccumulatorsInfo> responseFuture = sendRequest(
    +			accumulatorsHeaders,
    +			params
    +		);
    +
    +		return responseFuture.thenApply((accumulatorsInfo) -> {
    --- End diff --
    
    Adding types to lambdas is always a good idea.


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r175734460
  
    --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java ---
    @@ -542,6 +549,68 @@ public void testListJobs() throws Exception {
     		}
     	}
     
    +	@Test
    +	public void testGetAccumulators() throws Exception {
    +		TestAccumulatorHandlers accumulatorHandlers = new TestAccumulatorHandlers();
    +		TestAccumulatorHandlers.TestAccumulatorHandler accumulatorHandler = accumulatorHandlers.new TestAccumulatorHandler();
    +
    +		try (TestRestServerEndpoint ignored = createRestServerEndpoint(accumulatorHandler)){
    +
    +			JobID id = new JobID();
    +
    +			{
    +				Map<String, Object> accumulators = restClusterClient.getAccumulators(id);
    +				assertNotNull(accumulators);
    +				assertEquals(1, accumulators.size());
    +
    +				assertEquals(true, accumulators.containsKey("testKey"));
    +				assertEquals("testValue", accumulators.get("testKey").toString());
    +			}
    +		}
    +	}
    +
    +	private class TestAccumulatorHandlers  {
    --- End diff --
    
    Why having this extra wrapping class?


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    Changes look good @yanghua. Merging this PR.


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r172648696
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsQueryParameter.java ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages;
    +
    +/**
    + * query parameter for job's accumulator handler {@link org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}.
    + */
    +public class JobAccumulatorsQueryParameter extends MessageQueryParameter<String> {
    --- End diff --
    
    The name is also a bit generic.


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    Not sure what exactly happened but you can always go back to an earlier
    point via the reflog and then a git hard reset.
    
    On Tue, Mar 20, 2018 at 4:11 PM, vinoyang <no...@github.com> wrote:
    
    > hi @tillrohrmann <https://github.com/tillrohrmann> it seems I make a
    > wrong git operation, I merged some new commits and squashed into one, then
    > pushed into the PR(branch). What should I do to fix this problem?
    >
    > —
    > You are receiving this because you were mentioned.
    > Reply to this email directly, view it on GitHub
    > <https://github.com/apache/flink/pull/5573#issuecomment-374634057>, or mute
    > the thread
    > <https://github.com/notifications/unsubscribe-auth/AFfXumKiEJDwdbPtTNYflKOUF_6MbW3Cks5tgRwOgaJpZM4SRyVO>
    > .
    >



---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r172756524
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---
    @@ -389,6 +393,27 @@ public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirecto
     		});
     	}
     
    +	@Override
    +	public Map<String, Object> getAccumulators(final JobID jobID) throws Exception {
    +		final JobAccumulatorsHeaders accumulatorsHeaders = JobAccumulatorsHeaders.getInstance();
    +		final JobAccumulatorsMessageParameters accMsgParams = accumulatorsHeaders.getUnresolvedMessageParameters();
    +		accMsgParams.jobPathParameter.resolve(jobID);
    +		accMsgParams.queryParameter.resolve(Collections.singletonList(true));
    +
    +		CompletableFuture<JobAccumulatorsInfo> responseFuture = sendRequest(
    +			accumulatorsHeaders,
    +			accMsgParams
    +		);
    +
    +		return responseFuture.thenApply((JobAccumulatorsInfo accumulatorsInfo) -> {
    +			if (accumulatorsInfo != null && accumulatorsInfo.getSerializedUserAccumulators() != null) {
    +				return accumulatorsInfo.getSerializedUserAccumulators();
    --- End diff --
    
    the accumulators should be deserialized via `SerializedValue#deserialize(ClassLoader)` .
    
    If `Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader)` (that also should be overridden) was called use the passed in `ClassLoader`, otherwise `ClassLoader.getSystemClassLoader()`.


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r175733933
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---
    @@ -389,6 +394,36 @@ public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirecto
     		});
     	}
     
    +	@Override
    +	public Map<String, Object> getAccumulators(JobID jobID) throws Exception {
    +		return getAccumulators(jobID, ClassLoader.getSystemClassLoader());
    +	}
    +
    +	@Override
    +	public Map<String, Object> getAccumulators(final JobID jobID, ClassLoader loader) throws Exception {
    +		final JobAccumulatorsHeaders accumulatorsHeaders = JobAccumulatorsHeaders.getInstance();
    +		final JobAccumulatorsMessageParameters accMsgParams = accumulatorsHeaders.getUnresolvedMessageParameters();
    +		accMsgParams.jobPathParameter.resolve(jobID);
    +		accMsgParams.includeSerializedAccumulatorsParameter.resolve(Collections.singletonList(true));
    +
    +		CompletableFuture<JobAccumulatorsInfo> responseFuture = sendRequest(
    +			accumulatorsHeaders,
    +			accMsgParams
    +		);
    +
    +		return responseFuture.thenApply((JobAccumulatorsInfo accumulatorsInfo) -> {
    +			if (accumulatorsInfo != null && accumulatorsInfo.getSerializedUserAccumulators() != null) {
    +				try {
    +					return AccumulatorHelper.deserializeAccumulators(accumulatorsInfo.getSerializedUserAccumulators(), loader);
    +				} catch (Exception e) {
    +					log.error("Deserialize accumulators with customized classloader error : {}", e);
    --- End diff --
    
    Moreover, when logging an exception you don't have to specify a `{}` placeholder.


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    @tillrohrmann thanks for your reply, it's a misoperation, I have done rollback and refactored the code based on your suggestion. Please check the change again.


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    You can use `git commit -m "Trigger build" --allow-empty` to trigger a build.


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5573
  
     @GJL I have refactored the code and test case, could you please review the new commit? Thanks.


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    I agree with @aljoscha. We should not add more `public` methods to `ClusterClient`. Implementing `getAccumulators()` in `RestClusterClient` is enough. 
    
    Regarding the `JobAccumulatorsHandler`, what would speak against adding a new field to `UserTaskAccumulator` which stores the `SerializedValue`? The JSON representation would always carry an additional base64 encoded `byte` array but I think performance isn't important at this point.
    
    cc: @tillrohrmann @aljoscha 


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r172756640
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsMessageParameters.java ---
    @@ -0,0 +1,35 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +
    +/**
    + * request parameter for job accumulator's handler {@link org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}.
    + */
    +public class JobAccumulatorsMessageParameters extends JobMessageParameters {
    +
    +	public final AccumulatorsIncludeSerializedValueQueryParameter queryParameter = new AccumulatorsIncludeSerializedValueQueryParameter();
    --- End diff --
    
    field name is a bit generic; how about `includeSerializedAccumulators`?


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    @yanghua Take a look at `SerializedValueDeserializer` and `SerializedValueSerializer`. You can use `@JsonSerialize(using= ... )`, `@JsonSerialize(keyUsing= ... )`, and `@JsonDeserialize` to specify the class for ser/des.


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    @aljoscha yes, I agree. 
    
    > I think it would be good to be able to **query** both representations
    
    The "query" you mean `JobAccumulatorsHandler` or `RestClusterClient` behavior?
    
    In `JobAccumulatorsHandler#handleRequest` method, we could query the accumulator's string and serialized representations, and boxed in `JobAccumulatorsInfo` object. 
    
    1. let `getAccumulators` return accumulator's string representations and `getSerializedAccumulators` method return accumulator's serialized representations.
    
    2.  let `getAccumulators` return both representations?
    
    Which one is your idea?


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r172648802
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java ---
    @@ -104,6 +129,18 @@ public UserTaskAccumulator(
     			this.value = Preconditions.checkNotNull(value);
     		}
     
    +		public String getName() {
    --- End diff --
    
    missing `@JsonIgnore` annotation (also applies to other getters


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    @yanghua 
    I talked to @tillrohrmann offline, and we decided it is enough to add a query parameter such as `includeSerializedValue` to the `JobAccumulatorsHandler`. If `includeSerializedValue` is `true`, then `SerializedValue` should be part of the JSON response (in addition to the stringified value), otherwise only the stringified value. By default `includeSerializedValue` should be `false` because the Web UI cannot handle binary data.  For the request in `RestClusterClient` you would always set the flag to `true`.
    
    Let me know if you have any questions.
    
    cc: @tillrohrmann 
    
    
    



---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5573
  
     @GJL thanks for your guidance! 
     @aljoscha  and @tillrohrmann I have refactored this issue, remained the `getAccumulators` method created a `getSerializedAccumulators` method to return the serialized accumulators in `ClusterClient`. And override `getSerializedAccumulators` in `RestClusterClient`. For `JobAccumulatorsInfo`, I also remained `UserTaskAccumulator` and just added a new property for serialized accumulators. Please review, thanks.


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    hi @zentol , I refactor the code based on your review suggestion. Would you please review again, thanks!


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    I can't comment on the first points but on the last point. I would suggest to only add `getAccumulators()` to `RestClusterClient` that behaves the same way as the existing `getAccumulators()` on `ClusterClient`: it retrieves the serialised values and returns a map of the deserialised values.


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    please do not trigger builds just to get that perfect green build. The test failure here is quite common at the moment.


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r175750002
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java ---
    @@ -34,6 +36,10 @@
     
     	private static final long serialVersionUID = 1L;
     
    +	public SerializedValueDeserializer() {
    --- End diff --
    
    Alright, this makes sense.


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r175733724
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---
    @@ -389,6 +394,36 @@ public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirecto
     		});
     	}
     
    +	@Override
    +	public Map<String, Object> getAccumulators(JobID jobID) throws Exception {
    +		return getAccumulators(jobID, ClassLoader.getSystemClassLoader());
    +	}
    +
    +	@Override
    +	public Map<String, Object> getAccumulators(final JobID jobID, ClassLoader loader) throws Exception {
    +		final JobAccumulatorsHeaders accumulatorsHeaders = JobAccumulatorsHeaders.getInstance();
    +		final JobAccumulatorsMessageParameters accMsgParams = accumulatorsHeaders.getUnresolvedMessageParameters();
    +		accMsgParams.jobPathParameter.resolve(jobID);
    +		accMsgParams.includeSerializedAccumulatorsParameter.resolve(Collections.singletonList(true));
    +
    +		CompletableFuture<JobAccumulatorsInfo> responseFuture = sendRequest(
    +			accumulatorsHeaders,
    +			accMsgParams
    +		);
    +
    +		return responseFuture.thenApply((JobAccumulatorsInfo accumulatorsInfo) -> {
    +			if (accumulatorsInfo != null && accumulatorsInfo.getSerializedUserAccumulators() != null) {
    +				try {
    +					return AccumulatorHelper.deserializeAccumulators(accumulatorsInfo.getSerializedUserAccumulators(), loader);
    +				} catch (Exception e) {
    +					log.error("Deserialize accumulators with customized classloader error : {}", e);
    --- End diff --
    
    Let us properly fail with an exception here.


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r175735982
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java ---
    @@ -34,6 +36,10 @@
     
     	private static final long serialVersionUID = 1L;
     
    +	public SerializedValueDeserializer() {
    --- End diff --
    
    Why do we add this default constructor? I prefer to specify the type explicitly. Also if it is a simple `TypeReference<SerializedValue<Object>>`.


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r175734974
  
    --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java ---
    @@ -542,6 +549,68 @@ public void testListJobs() throws Exception {
     		}
     	}
     
    +	@Test
    +	public void testGetAccumulators() throws Exception {
    +		TestAccumulatorHandlers accumulatorHandlers = new TestAccumulatorHandlers();
    +		TestAccumulatorHandlers.TestAccumulatorHandler accumulatorHandler = accumulatorHandlers.new TestAccumulatorHandler();
    +
    +		try (TestRestServerEndpoint ignored = createRestServerEndpoint(accumulatorHandler)){
    +
    +			JobID id = new JobID();
    +
    +			{
    +				Map<String, Object> accumulators = restClusterClient.getAccumulators(id);
    +				assertNotNull(accumulators);
    +				assertEquals(1, accumulators.size());
    +
    +				assertEquals(true, accumulators.containsKey("testKey"));
    +				assertEquals("testValue", accumulators.get("testKey").toString());
    +			}
    +		}
    +	}
    +
    +	private class TestAccumulatorHandlers  {
    +
    +		private class TestAccumulatorHandler extends TestHandler<EmptyRequestBody, JobAccumulatorsInfo, JobAccumulatorsMessageParameters> {
    +
    +			public TestAccumulatorHandler() {
    +				super(JobAccumulatorsHeaders.getInstance());
    +			}
    +
    +			@Override
    +			protected CompletableFuture<JobAccumulatorsInfo> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody,
    +				JobAccumulatorsMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
    +				JobAccumulatorsInfo accumulatorsInfo;
    +				List<Boolean> queryParams = request.getQueryParameter(AccumulatorsIncludeSerializedValueQueryParameter.class);
    +
    +				boolean includeSerializedValue = false;
    +				if (!queryParams.isEmpty()) {
    +					includeSerializedValue = queryParams.get(0);
    +				}
    +
    +				List<JobAccumulatorsInfo.UserTaskAccumulator> userTaskAccumulators = new ArrayList<JobAccumulatorsInfo.UserTaskAccumulator>() {{
    --- End diff --
    
    Instead of creating a new anonymous class can we simply create an `ArrayList` and then add the element (without the initializer block)?


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r175733195
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---
    @@ -389,6 +394,36 @@ public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirecto
     		});
     	}
     
    +	@Override
    +	public Map<String, Object> getAccumulators(JobID jobID) throws Exception {
    +		return getAccumulators(jobID, ClassLoader.getSystemClassLoader());
    +	}
    --- End diff --
    
    Should not be necessary to override.


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    hi @tillrohrmann it seems I make a wrong git operation, I merged some new commits and squashed into one, then pushed into the PR(branch). What should I do to fix this problem?


---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    OK, base of @aljoscha 's opinion, I describe the implementation detail before coding : 
    
    * define a `AccumulatorRepresentationsQueryParameter` class which extends `MessageQueryParameter` and contains to two representation mode (***stringified***  and ***serialized***)
    
    * define a `JobAccumulatorsMessageParameters` class which extends `MessageParameters`, and the `getQueryParameters` method will return the `AccumulatorRepresentationsQueryParameter`
    
    * refactor `JobAccumulatorsHeaders#getUnresolvedMessageParameters` return `JobAccumulatorsMessageParameters`'s instance
    
    * refactor `JobAccumulatorsHandler#handleRequest` it will query specific accumulator's representation base of `AccumulatorRepresentationsQueryParameter`, and the `JobAccumulatorsInfo` will be reused for both representations
    
    * in `RestClusterClient` class , let `getAccumulators` return stringified accumulators(`Map<String, Object>`) and `getSerializedAccumulators` return serialized accumulators(`Map<String, SerializedValue<Object>>`)
    
    @tillrohrmann and @GJL  hope for your opinions, Thanks.



---

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/5573
  
    Hi @aljoscha and @tillrohrmann  I have tried to put `Map<String, SerializedValue<Object>>` into `JobAccumulatorsInfo` with two ways : 
    *  a `@JsonProperty` 
    * marked with `@JsonIgnore` and provide getter and setter
    
    and the two ways both failed with different reason: 
    
    * first way : throw a exception : `SerializedValue` no suitable constructor;
    * second way : the setted object will be ignore by `HandlerUtils#sendResponse`
    
    In `JobAccumulatorsHandler#handleRequest` I can not return `Map<String, SerializedValue<Object>>` directly, because this method's result type must extends `ResponseBody`.


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5573#discussion_r175741738
  
    --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java ---
    @@ -542,6 +549,68 @@ public void testListJobs() throws Exception {
     		}
     	}
     
    +	@Test
    +	public void testGetAccumulators() throws Exception {
    +		TestAccumulatorHandlers accumulatorHandlers = new TestAccumulatorHandlers();
    +		TestAccumulatorHandlers.TestAccumulatorHandler accumulatorHandler = accumulatorHandlers.new TestAccumulatorHandler();
    +
    +		try (TestRestServerEndpoint ignored = createRestServerEndpoint(accumulatorHandler)){
    +
    +			JobID id = new JobID();
    +
    +			{
    +				Map<String, Object> accumulators = restClusterClient.getAccumulators(id);
    +				assertNotNull(accumulators);
    +				assertEquals(1, accumulators.size());
    +
    +				assertEquals(true, accumulators.containsKey("testKey"));
    +				assertEquals("testValue", accumulators.get("testKey").toString());
    +			}
    +		}
    +	}
    +
    +	private class TestAccumulatorHandlers  {
    --- End diff --
    
    this follows the `TestSavepointHandlers` in `RestClusterClientTest`, I will remove it.


---

[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5573


---