You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2016/08/12 13:47:52 UTC

[GitHub] flink pull request #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc in...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/2365

    [FLINK-4383] [rpc] Eagerly serialize remote rpc invocation messages

    This PR introduces an eager serialization for remote rpc invocation messages.
    That way it is possible to check whether the message is serializable and
    whether it exceeds the maximum allowed akka frame size. If either of these
    constraints is violated, a proper exception is thrown instead of simply
    swallowing the exception as Akka does it.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink rpcSerializability

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2365.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2365
    
----
commit 7dcac59723130bd9a77f34cea574f8d7303305f8
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-08-12T08:32:30Z

    [FLINK-4383] [rpc] Eagerly serialize remote rpc invocation messages
    
    This PR introduces an eager serialization for remote rpc invocation messages.
    That way it is possible to check whether the message is serializable and
    whether it exceeds the maximum allowed akka frame size. If either of these
    constraints is violated, a proper exception is thrown instead of simply
    swallowing the exception as Akka does it.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc in...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2365#discussion_r74688165
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java ---
    @@ -25,33 +25,42 @@
     import org.apache.flink.runtime.rpc.MainThreadExecutor;
     import org.apache.flink.runtime.rpc.RpcTimeout;
     import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
    +import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation;
    +import org.apache.flink.runtime.rpc.akka.messages.RemoteRpcInvocation;
     import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
     import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
     import org.apache.flink.util.Preconditions;
    +import org.apache.log4j.Logger;
     import scala.concurrent.Await;
     import scala.concurrent.Future;
     import scala.concurrent.duration.FiniteDuration;
     
    +import java.io.IOException;
     import java.lang.annotation.Annotation;
     import java.lang.reflect.InvocationHandler;
     import java.lang.reflect.Method;
     import java.util.BitSet;
     import java.util.concurrent.Callable;
     
     /**
    - * Invocation handler to be used with a {@link AkkaRpcActor}. The invocation handler wraps the
    - * rpc in a {@link RpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is
    + * Invocation handler to be used with an {@link AkkaRpcActor}. The invocation handler wraps the
    + * rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is
      * executed.
      */
     class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutor {
    +	private static final Logger LOG = Logger.getLogger(AkkaInvocationHandler.class);
    +
     	private final ActorRef rpcServer;
     
     	// default timeout for asks
     	private final Timeout timeout;
     
    -	AkkaInvocationHandler(ActorRef rpcServer, Timeout timeout) {
    +	private final long maximumFramesize;
    +
    +	AkkaInvocationHandler(ActorRef rpcServer, Timeout timeout, long maximumFramesize) {
    --- End diff --
    
    How expensive is the `isLocalActorRef()` call? Does it make sense to call it once here and have a flag `isLocal`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc invocatio...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2365
  
    This is a great feature and it looks very good overall. Minor comments inline.
    
    Big +1 overall


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc in...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2365#discussion_r74737756
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java ---
    @@ -25,33 +25,42 @@
     import org.apache.flink.runtime.rpc.MainThreadExecutor;
     import org.apache.flink.runtime.rpc.RpcTimeout;
     import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
    +import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation;
    +import org.apache.flink.runtime.rpc.akka.messages.RemoteRpcInvocation;
     import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
     import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
     import org.apache.flink.util.Preconditions;
    +import org.apache.log4j.Logger;
     import scala.concurrent.Await;
     import scala.concurrent.Future;
     import scala.concurrent.duration.FiniteDuration;
     
    +import java.io.IOException;
     import java.lang.annotation.Annotation;
     import java.lang.reflect.InvocationHandler;
     import java.lang.reflect.Method;
     import java.util.BitSet;
     import java.util.concurrent.Callable;
     
     /**
    - * Invocation handler to be used with a {@link AkkaRpcActor}. The invocation handler wraps the
    - * rpc in a {@link RpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is
    + * Invocation handler to be used with an {@link AkkaRpcActor}. The invocation handler wraps the
    + * rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is
      * executed.
      */
     class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutor {
    +	private static final Logger LOG = Logger.getLogger(AkkaInvocationHandler.class);
    +
     	private final ActorRef rpcServer;
    --- End diff --
    
    Good point, will change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc in...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2365#discussion_r74737963
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java ---
    @@ -25,33 +25,42 @@
     import org.apache.flink.runtime.rpc.MainThreadExecutor;
     import org.apache.flink.runtime.rpc.RpcTimeout;
     import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
    +import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation;
    +import org.apache.flink.runtime.rpc.akka.messages.RemoteRpcInvocation;
     import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
     import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
     import org.apache.flink.util.Preconditions;
    +import org.apache.log4j.Logger;
     import scala.concurrent.Await;
     import scala.concurrent.Future;
     import scala.concurrent.duration.FiniteDuration;
     
    +import java.io.IOException;
     import java.lang.annotation.Annotation;
     import java.lang.reflect.InvocationHandler;
     import java.lang.reflect.Method;
     import java.util.BitSet;
     import java.util.concurrent.Callable;
     
     /**
    - * Invocation handler to be used with a {@link AkkaRpcActor}. The invocation handler wraps the
    - * rpc in a {@link RpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is
    + * Invocation handler to be used with an {@link AkkaRpcActor}. The invocation handler wraps the
    + * rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is
      * executed.
      */
     class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutor {
    +	private static final Logger LOG = Logger.getLogger(AkkaInvocationHandler.class);
    +
     	private final ActorRef rpcServer;
     
     	// default timeout for asks
     	private final Timeout timeout;
     
    -	AkkaInvocationHandler(ActorRef rpcServer, Timeout timeout) {
    +	private final long maximumFramesize;
    +
    +	AkkaInvocationHandler(ActorRef rpcServer, Timeout timeout, long maximumFramesize) {
    --- End diff --
    
    `isLocalActorRef` should actually be quite cheap since it simply checks whether the host option of the actor's address is defined or not. But, anyway, it makes sense to check it once and store the result. Will add the change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc in...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2365#discussion_r74688148
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RemoteRpcInvocation.java ---
    @@ -0,0 +1,187 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.akka.messages;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.SerializedValue;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +
    +/**
    + * Remote rpc invocation message which is used when the actor communication is remote and, thus, the
    + * message has to be serialized.
    + * <p>
    + * In order to fail fast and report an appropriate error message to the user, the method name, the
    + * parameter types and the arguments are eagerly serialized. In case the the invocation call
    + * contains a non-serializable object, then an {@link IOException} is thrown.
    + */
    +public class RemoteRpcInvocation implements RpcInvocation, Serializable {
    +	private static final long serialVersionUID = 6179354390913843809L;
    +
    +	// Serialized invocation data
    +	private final SerializedValue<RemoteRpcInvocation.MethodInvocation> serializedMethodInvocation;
    +
    +	// Transient field which is lazily initialized upon first access to the invocation data
    +	private transient RemoteRpcInvocation.MethodInvocation methodInvocation;
    +
    +	public  RemoteRpcInvocation(
    +		final String methodName,
    +		final Class<?>[] parameterTypes,
    +		final Object[] args) throws IOException {
    +
    +		serializedMethodInvocation = new SerializedValue<>(new RemoteRpcInvocation.MethodInvocation(methodName, parameterTypes, args));
    --- End diff --
    
    Minor comment: Not really important here, but usually good practice to initialized the transient cache field as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc in...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann closed the pull request at:

    https://github.com/apache/flink/pull/2365


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc invocatio...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2365
  
    Somehow this PR was not closed automatically.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc in...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2365#discussion_r74737803
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RemoteRpcInvocation.java ---
    @@ -0,0 +1,187 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.akka.messages;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.SerializedValue;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +
    +/**
    + * Remote rpc invocation message which is used when the actor communication is remote and, thus, the
    + * message has to be serialized.
    + * <p>
    + * In order to fail fast and report an appropriate error message to the user, the method name, the
    + * parameter types and the arguments are eagerly serialized. In case the the invocation call
    + * contains a non-serializable object, then an {@link IOException} is thrown.
    + */
    +public class RemoteRpcInvocation implements RpcInvocation, Serializable {
    +	private static final long serialVersionUID = 6179354390913843809L;
    +
    +	// Serialized invocation data
    +	private final SerializedValue<RemoteRpcInvocation.MethodInvocation> serializedMethodInvocation;
    +
    +	// Transient field which is lazily initialized upon first access to the invocation data
    +	private transient RemoteRpcInvocation.MethodInvocation methodInvocation;
    +
    +	public  RemoteRpcInvocation(
    +		final String methodName,
    +		final Class<?>[] parameterTypes,
    +		final Object[] args) throws IOException {
    +
    +		serializedMethodInvocation = new SerializedValue<>(new RemoteRpcInvocation.MethodInvocation(methodName, parameterTypes, args));
    --- End diff --
    
    You're right. Will add it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc in...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2365#discussion_r74688005
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java ---
    @@ -25,33 +25,42 @@
     import org.apache.flink.runtime.rpc.MainThreadExecutor;
     import org.apache.flink.runtime.rpc.RpcTimeout;
     import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
    +import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation;
    +import org.apache.flink.runtime.rpc.akka.messages.RemoteRpcInvocation;
     import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
     import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
     import org.apache.flink.util.Preconditions;
    +import org.apache.log4j.Logger;
     import scala.concurrent.Await;
     import scala.concurrent.Future;
     import scala.concurrent.duration.FiniteDuration;
     
    +import java.io.IOException;
     import java.lang.annotation.Annotation;
     import java.lang.reflect.InvocationHandler;
     import java.lang.reflect.Method;
     import java.util.BitSet;
     import java.util.concurrent.Callable;
     
     /**
    - * Invocation handler to be used with a {@link AkkaRpcActor}. The invocation handler wraps the
    - * rpc in a {@link RpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is
    + * Invocation handler to be used with an {@link AkkaRpcActor}. The invocation handler wraps the
    + * rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is
      * executed.
      */
     class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutor {
    +	private static final Logger LOG = Logger.getLogger(AkkaInvocationHandler.class);
    +
     	private final ActorRef rpcServer;
    --- End diff --
    
    How about renaming this to `rpcEndpoint`. That way it would line up with the naming in other places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2365: [FLINK-4383] [rpc] Eagerly serialize remote rpc invocatio...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2365
  
    Thanks for the review @StephanEwen. I've addressed your comments. Once Travis gives green light, I'll merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---