You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2016/03/18 16:31:53 UTC

[GitHub] flink pull request: [FLINK-3633] Fix user code de/serialization in...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/1818

    [FLINK-3633] Fix user code de/serialization in ExecutionConfig

    FLINK-3327 moved the ExecutionConfig directly to the JobGraph so that it was serialized
    and deserialized using the system class loader when sending a SubmitJob message to the
    JobManager. This is problematic since the ExecutionConfig can contain user code class
    which require a user code class loader for deserialization. In order to circumvent the
    problem, a UserCodeValue class was introduced which automatically sends the wrapped value
    as a byte array. On the receiving side, the wrapped value has to be explicitly deserialized
    providing a class loader.
    
    To test the feature the ScalaShellITCase.testSubmissionOfExternalLibrary was adapted
    to register org.apache.flink.ml.math.Vector at the ExecutionConfig.
    
    This commit also re-introduces the removed ExecutionConfig.CONFIG_KEY key, so that
    version 1.1 does not break the API.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink fixJobSubmission

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1818.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1818
    
----
commit 7abef1b685d76a5249ddeb0fcc4f1190b541ba57
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-03-18T15:22:04Z

    [FLINK-3633] Fix user code de/serialization in ExecutionConfig
    
    FLINK-3327 moved the ExecutionConfig directly to the JobGraph so that it was serialized
    and deserialized using the system class loader when sending a SubmitJob message to the
    JobManager. This is problematic since the ExecutionConfig can contain user code class
    which require a user code class loader for deserialization. In order to circumvent the
    problem, a UserCodeValue class was introduced which automatically sends the wrapped value
    as a byte array. On the receiving side, the wrapped value has to be explicitly deserialized
    providing a class loader.
    
    To test the feature the ScalaShellITCase.testSubmissionOfExternalLibrary was adapted
    to register org.apache.flink.ml.math.Vector at the ExecutionConfig.
    
    This commit also re-introduces the removed ExecutionConfig.CONFIG_KEY key, so that
    version 1.1 does not break the API.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3633] Fix user code de/serialization in...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1818#issuecomment-200830880
  
    The other programs that are executed in `ClassLoaderITCase` are contained in the `org.apache.flink.test.classloading.jar` package (search for `KMeansForTest` for example). They are then packaged by the assembly plugin (see `pom.xml` of `flink-tests`). The `ClassLoaderITCase` sets up a cluster and submits the assembled JARs.
    
    Can we just follow this setup?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3633] Fix user code de/serialization in...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1818#discussion_r56803957
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
    @@ -109,22 +113,22 @@
     
     	private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration;
     	
    -	private long taskCancellationIntervalMillis = ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS;
    +	private long taskCancellationIntervalMillis = -1;
     
     	// Serializers and types registered with Kryo and the PojoSerializer
     	// we store them in linked maps/sets to ensure they are registered in order in all kryo instances.
     
    -	private final LinkedHashMap<Class<?>, SerializableSerializer<?>> registeredTypesWithKryoSerializers = new LinkedHashMap<>();
    +	private final UserCodeValue<LinkedHashMap<Class<?>, SerializableSerializer<?>>> registeredTypesWithKryoSerializers = new UserCodeValue<>(new LinkedHashMap<Class<?>, SerializableSerializer<?>>());
     
    -	private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>();
    +	private final UserCodeValue<LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>> registeredTypesWithKryoSerializerClasses = new UserCodeValue<>(new LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>());
     
    -	private final LinkedHashMap<Class<?>, SerializableSerializer<?>> defaultKryoSerializers = new LinkedHashMap<>();
    +	private final UserCodeValue<LinkedHashMap<Class<?>, SerializableSerializer<?>>> defaultKryoSerializers = new UserCodeValue<>(new LinkedHashMap<Class<?>, SerializableSerializer<?>>());
     
    -	private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultKryoSerializerClasses = new LinkedHashMap<>();
    +	private final UserCodeValue<LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>> defaultKryoSerializerClasses = new UserCodeValue<>(new LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>());
     
    -	private final LinkedHashSet<Class<?>> registeredKryoTypes = new LinkedHashSet<>();
    +	private final UserCodeValue<LinkedHashSet<Class<?>>> registeredKryoTypes = new UserCodeValue<>(new LinkedHashSet<Class<?>>());
     
    -	private final LinkedHashSet<Class<?>> registeredPojoTypes = new LinkedHashSet<>();
    +	private final UserCodeValue<LinkedHashSet<Class<?>>> registeredPojoTypes = new UserCodeValue<>(new LinkedHashSet<Class<?>>());
    --- End diff --
    
    That is true, I will fix it. Personally, I think this `GlobalJobParameters` is a huge crime. It basically allows you to store anything in the `ExecutionConfig`. Sometimes I'm overcome with the feeling that this `ExecutionConfig` is simply a dumping ground for all kinds of parameters which are somehow needed somewhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3633] Fix user code de/serialization in...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1818#discussion_r56803053
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/UserCodeValue.java ---
    @@ -0,0 +1,98 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.util;
    +
    +
    +import java.io.IOException;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +import java.util.Arrays;
    +
    +/**
    + * Value wrapper which automatically transfers the data as a serialized byte array. Upon reception
    + * the serialized byte array has to be explicitly deserialized providing a class loader with the
    + * correct classes.
    + *
    + * This is useful if the object would get deserialized by the system class loader which does not
    + * have access to the user code classes.
    + * @param <T>
    + */
    +public class UserCodeValue<T extends Serializable> implements Serializable {
    +	private static final long serialVersionUID = 6903400536729445732L;
    +
    +	private transient T value;
    +
    +	private byte[] serializedValue;
    +
    +	public UserCodeValue(T value) {
    +		this.value = value;
    +	}
    +
    +	public T getValue() {
    +		if (value == null && serializedValue != null) {
    +			throw new NullPointerException("The user code value has not been serialied. Call " +
    --- End diff --
    
    good catch. Thanks :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3633] Fix user code de/serialization in...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1818#discussion_r56804605
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
    @@ -109,22 +113,22 @@
     
     	private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration;
     	
    -	private long taskCancellationIntervalMillis = ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS;
    +	private long taskCancellationIntervalMillis = -1;
     
     	// Serializers and types registered with Kryo and the PojoSerializer
     	// we store them in linked maps/sets to ensure they are registered in order in all kryo instances.
     
    -	private final LinkedHashMap<Class<?>, SerializableSerializer<?>> registeredTypesWithKryoSerializers = new LinkedHashMap<>();
    +	private final UserCodeValue<LinkedHashMap<Class<?>, SerializableSerializer<?>>> registeredTypesWithKryoSerializers = new UserCodeValue<>(new LinkedHashMap<Class<?>, SerializableSerializer<?>>());
     
    -	private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>();
    +	private final UserCodeValue<LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>> registeredTypesWithKryoSerializerClasses = new UserCodeValue<>(new LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>());
     
    -	private final LinkedHashMap<Class<?>, SerializableSerializer<?>> defaultKryoSerializers = new LinkedHashMap<>();
    +	private final UserCodeValue<LinkedHashMap<Class<?>, SerializableSerializer<?>>> defaultKryoSerializers = new UserCodeValue<>(new LinkedHashMap<Class<?>, SerializableSerializer<?>>());
     
    -	private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultKryoSerializerClasses = new LinkedHashMap<>();
    +	private final UserCodeValue<LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>> defaultKryoSerializerClasses = new UserCodeValue<>(new LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>());
     
    -	private final LinkedHashSet<Class<?>> registeredKryoTypes = new LinkedHashSet<>();
    +	private final UserCodeValue<LinkedHashSet<Class<?>>> registeredKryoTypes = new UserCodeValue<>(new LinkedHashSet<Class<?>>());
     
    -	private final LinkedHashSet<Class<?>> registeredPojoTypes = new LinkedHashSet<>();
    +	private final UserCodeValue<LinkedHashSet<Class<?>>> registeredPojoTypes = new UserCodeValue<>(new LinkedHashSet<Class<?>>());
    --- End diff --
    
    Thanks for the fix.
    
    The original motivation for adding it was the following:
    Users requested a way to have one central configuration file for all their job's UDFs. With setting global job parameters (for example the ParameterTool), they can use a properties file and the KVs from there are accessible from all UDFs.
    Also, the same user wanted to have these configuration properties accessible in the webinterface. So if the `toMap()` method is overwritten (the ParameterTool is doing that), the job parameters are accessible from the web interface.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3633] Fix user code de/serialization in...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1818#issuecomment-204368378
  
    That is a good idea @uce :-) I'll add a corresponding test for the `ExecutionConfig`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3633] Fix user code de/serialization in...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1818#issuecomment-204452690
  
    Failing test case seems to be unrelated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3633] Fix user code de/serialization in...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1818#issuecomment-200802077
  
    The changes look good and make parts where user code is serialized very clear. :+1: 
    
    I've verified that the `ScalaShellITCase` with the change in this PR fails for the current master. I'm wondering whether it is possible to make the test for this more explicit by adding a JAR with the example program outlined in the JIRA issue to the `ClassLoaderITCase`.
    
    In any case, +1 to merge. It's your call whether you add a further class loader test or not.
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3633] Fix user code de/serialization in...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1818#discussion_r56683381
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/UserCodeValue.java ---
    @@ -0,0 +1,98 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.util;
    +
    +
    +import java.io.IOException;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +import java.util.Arrays;
    +
    +/**
    + * Value wrapper which automatically transfers the data as a serialized byte array. Upon reception
    + * the serialized byte array has to be explicitly deserialized providing a class loader with the
    + * correct classes.
    + *
    + * This is useful if the object would get deserialized by the system class loader which does not
    + * have access to the user code classes.
    + * @param <T>
    + */
    +public class UserCodeValue<T extends Serializable> implements Serializable {
    +	private static final long serialVersionUID = 6903400536729445732L;
    +
    +	private transient T value;
    +
    +	private byte[] serializedValue;
    +
    +	public UserCodeValue(T value) {
    +		this.value = value;
    +	}
    +
    +	public T getValue() {
    +		if (value == null && serializedValue != null) {
    +			throw new NullPointerException("The user code value has not been serialied. Call " +
    --- End diff --
    
    "serialized" -> "deserialized"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3633] Fix user code de/serialization in...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1818#discussion_r56676581
  
    --- Diff: docs/setup/config.md ---
    @@ -183,7 +184,7 @@ The following parameters configure Flink's JobManager and TaskManagers.
     - `akka.transport.threshold`: Threshold for the transport failure detector. Since Flink uses TCP, the detector is not necessary and, thus, the threshold is set to a high value (DEFAULT: **300**).
     - `akka.tcp.timeout`: Timeout for all outbound connections. If you should experience problems with connecting to a TaskManager due to a slow network, you should increase this value (DEFAULT: **akka.ask.timeout**).
     - `akka.throughput`: Number of messages that are processed in a batch before returning the thread to the pool. Low values denote a fair scheduling whereas high values can increase the performance at the cost of unfairness (DEFAULT: **15**).
    -- `akka.log.lifecycle.events`: Turns on the Akka's remote logging of events. Set this value to 'on' in case of debugging (DEFAULT: **off**).
    +- `akka.log.lifecycle.events`: Turns on the Akka's remote logging of events. Set this value to 'on' in case of debugging (DEFAULT: **false**).
    --- End diff --
    
    Should this say, "Set this value to 'true' in case ..."?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3633] Fix user code de/serialization in...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1818#issuecomment-200366856
  
    Yes I think so.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3633] Fix user code de/serialization in...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1818#issuecomment-199363606
  
    Due to the tight coupling of the ExecutionConfig and multiple Flink components
    (e.g. PojoSerializer) the automatic serialization and manual deserialization of
    user code objects via the UserCodeValue class caused problems. In order to minimize
    the impact of the changes, I changed the serialization strategy to an explicit one.
    One has to call `ExecutionConfig.serializeUserCode` to store the user code objects in a SerializedValue
    object and nulling the corresponding member fields. If that is not done, then it is
    assumed that the object is deserialized using a user code class loader. On the receiving side one has to call `ExecutionConfig.deserializeConfig` providing a class loader which knows the user code classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3633] Fix user code de/serialization in...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1818#discussion_r56683616
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
    @@ -499,7 +514,7 @@ else if (current == ExecutionState.CANCELING) {
     					jobId, vertexId, executionId, userCodeClassLoader, actorAskTimeout);
     
     			Environment env = new RuntimeEnvironment(jobId, vertexId, executionId,
    -					executionConfig, taskInfo, jobConfiguration, taskConfiguration,
    +				executionConfig, taskInfo, jobConfiguration, taskConfiguration,
    --- End diff --
    
    Formatting regression?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3633] Fix user code de/serialization in...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1818#discussion_r56699092
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
    @@ -109,22 +113,22 @@
     
     	private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration;
     	
    -	private long taskCancellationIntervalMillis = ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS;
    +	private long taskCancellationIntervalMillis = -1;
     
     	// Serializers and types registered with Kryo and the PojoSerializer
     	// we store them in linked maps/sets to ensure they are registered in order in all kryo instances.
     
    -	private final LinkedHashMap<Class<?>, SerializableSerializer<?>> registeredTypesWithKryoSerializers = new LinkedHashMap<>();
    +	private final UserCodeValue<LinkedHashMap<Class<?>, SerializableSerializer<?>>> registeredTypesWithKryoSerializers = new UserCodeValue<>(new LinkedHashMap<Class<?>, SerializableSerializer<?>>());
     
    -	private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>();
    +	private final UserCodeValue<LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>> registeredTypesWithKryoSerializerClasses = new UserCodeValue<>(new LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>());
     
    -	private final LinkedHashMap<Class<?>, SerializableSerializer<?>> defaultKryoSerializers = new LinkedHashMap<>();
    +	private final UserCodeValue<LinkedHashMap<Class<?>, SerializableSerializer<?>>> defaultKryoSerializers = new UserCodeValue<>(new LinkedHashMap<Class<?>, SerializableSerializer<?>>());
     
    -	private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultKryoSerializerClasses = new LinkedHashMap<>();
    +	private final UserCodeValue<LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>> defaultKryoSerializerClasses = new UserCodeValue<>(new LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>());
     
    -	private final LinkedHashSet<Class<?>> registeredKryoTypes = new LinkedHashSet<>();
    +	private final UserCodeValue<LinkedHashSet<Class<?>>> registeredKryoTypes = new UserCodeValue<>(new LinkedHashSet<Class<?>>());
     
    -	private final LinkedHashSet<Class<?>> registeredPojoTypes = new LinkedHashSet<>();
    +	private final UserCodeValue<LinkedHashSet<Class<?>>> registeredPojoTypes = new UserCodeValue<>(new LinkedHashSet<Class<?>>());
    --- End diff --
    
    You forgot to put the `GlobalJobParameters` into a `UserCodeValue`. That one can also contain usercode.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3633] Fix user code de/serialization in...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1818#issuecomment-204399612
  
    I've added an additional test case for registered user code types in the `ExecutionConfig`. I will rebase this PR and if travis gives green light, then I'll merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3633] Fix user code de/serialization in...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1818#issuecomment-200821794
  
    I agree that having a dedicated class loading test would be nicer. However, we would need to build a jar containing the test job before executing the test, right? So the easiest solution for that would be a new module. But that is also really ugly. @uce do you have a better idea for that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3633] Fix user code de/serialization in...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1818#issuecomment-200367314
  
    okay, thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3633] Fix user code de/serialization in...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1818#issuecomment-200364341
  
    The issue has been introduced after the 1.0 release, so we don't need to incldue this into 1.0.1?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3633] Fix user code de/serialization in...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1818#discussion_r56676850
  
    --- Diff: docs/setup/config.md ---
    @@ -183,7 +184,7 @@ The following parameters configure Flink's JobManager and TaskManagers.
     - `akka.transport.threshold`: Threshold for the transport failure detector. Since Flink uses TCP, the detector is not necessary and, thus, the threshold is set to a high value (DEFAULT: **300**).
     - `akka.tcp.timeout`: Timeout for all outbound connections. If you should experience problems with connecting to a TaskManager due to a slow network, you should increase this value (DEFAULT: **akka.ask.timeout**).
     - `akka.throughput`: Number of messages that are processed in a batch before returning the thread to the pool. Low values denote a fair scheduling whereas high values can increase the performance at the cost of unfairness (DEFAULT: **15**).
    -- `akka.log.lifecycle.events`: Turns on the Akka's remote logging of events. Set this value to 'on' in case of debugging (DEFAULT: **off**).
    +- `akka.log.lifecycle.events`: Turns on the Akka's remote logging of events. Set this value to 'on' in case of debugging (DEFAULT: **false**).
    --- End diff --
    
    Yes, definitely. Thanks for spotting it :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3633] Fix user code de/serialization in...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1818#discussion_r56803087
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
    @@ -499,7 +514,7 @@ else if (current == ExecutionState.CANCELING) {
     					jobId, vertexId, executionId, userCodeClassLoader, actorAskTimeout);
     
     			Environment env = new RuntimeEnvironment(jobId, vertexId, executionId,
    -					executionConfig, taskInfo, jobConfiguration, taskConfiguration,
    +				executionConfig, taskInfo, jobConfiguration, taskConfiguration,
    --- End diff --
    
    True, will revert it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3633] Fix user code de/serialization in...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1818


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---