You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@bahir.apache.org by sbcd90 <gi...@git.apache.org> on 2016/10/25 20:25:35 UTC

[GitHub] bahir-flink pull request #8: [BAHIR-73][bahir-flink] flink-streaming-akka so...

GitHub user sbcd90 opened a pull request:

    https://github.com/apache/bahir-flink/pull/8

    [BAHIR-73][bahir-flink] flink-streaming-akka source connector

    This PR is created to propose the idea of having a flink-streaming-akka source connector.
    
    The source connector can be used to receive messages from an Akka feeder or publisher actor & these messages can then be processed using flink streaming.
    
    The source connector has the following features.
    
    It can supports several different message formats like iterable data, bytes array & data with timestamp.
    It can send back acknowledgements to the feeder actor.
    
    Reference to the closed PR in Flink: apache/flink#2644
    
    @rmetzger , kindly review.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sbcd90/bahir-flink flink-akka

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/bahir-flink/pull/8.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #8
    
----
commit 5ff2802b439003f3a46908e8dc65a6dec8a45e0a
Author: Subhobrata Dey <sb...@gmail.com>
Date:   2016-10-25T20:21:07Z

    [BAHIR-73][bahir-flink] flink-streaming-akka source connector

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir-flink pull request #8: [BAHIR-73][bahir-flink] flink-streaming-akka so...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/bahir-flink/pull/8


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir-flink pull request #8: [BAHIR-73][bahir-flink] flink-streaming-akka so...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/bahir-flink/pull/8#discussion_r89061655
  
    --- Diff: flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java ---
    @@ -0,0 +1,147 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.akka;
    +
    +import akka.actor.ActorRef;
    +import akka.actor.ActorSystem;
    +import akka.actor.PoisonPill;
    +import akka.actor.Props;
    +import com.typesafe.config.Config;
    +import com.typesafe.config.ConfigFactory;
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.api.common.functions.StoppableFunction;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.connectors.akka.utils.ReceiverActor;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Implementation of {@link SourceFunction} specialized to read messages
    + * from Akka actors.
    + */
    +public class AkkaSource extends RichSourceFunction<Object>
    +  implements StoppableFunction {
    +
    +  private static final Logger LOG = LoggerFactory.getLogger(AkkaSource.class);
    +
    +  private static final long serialVersionUID = 1L;
    +
    +  // --- Fields set by the constructor
    +
    +  private final Class<?> classForActor;
    +
    +  private final String actorName;
    +
    +  private final String urlOfPublisher;
    +
    +  // --- Runtime fields
    +  private transient ActorSystem receiverActorSystem;
    +  private transient ActorRef receiverActor;
    +  private transient Object waitLock;
    +  private transient boolean running = true;
    +
    +  protected transient boolean autoAck;
    +
    +  /**
    +   * Creates {@link AkkaSource} for Streaming
    +   *
    +   * @param actorName Receiver Actor name
    +   * @param urlOfPublisher tcp url of the publisher or feeder actor
    +   */
    +  public AkkaSource(String actorName,
    +          String urlOfPublisher) {
    +    super();
    +    this.classForActor = ReceiverActor.class;
    +    this.actorName = actorName;
    +    this.urlOfPublisher = urlOfPublisher;
    +  }
    +
    +  @Override
    +  public void open(Configuration parameters) throws Exception {
    +    waitLock = new Object();
    +    receiverActorSystem = createDefaultActorSystem();
    +
    +    RuntimeContext runtimeContext = getRuntimeContext();
    +    if (runtimeContext instanceof StreamingRuntimeContext
    +      && ((StreamingRuntimeContext) runtimeContext).isCheckpointingEnabled()) {
    +      autoAck = false;
    +    } else {
    +      autoAck = true;
    --- End diff --
    
    Why is the acking dependent on the checkpointing?
    Maybe it would make sense to allow the user to configure this independently.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir-flink issue #8: [BAHIR-73][bahir-flink] flink-streaming-akka source co...

Posted by sbcd90 <gi...@git.apache.org>.
Github user sbcd90 commented on the issue:

    https://github.com/apache/bahir-flink/pull/8
  
    Hello @rmetzger ,
    
    Thanks for the code review.
    
    I made the code changes to accomodate the following points:
    
    - Add README
    - support user-specified configuration for receiver actor system.
    - to allows acks or not, can be configured by user.
    - code refactored to move the `receiverActorSystem.awaitTermination()` call & remove the `byte[]` data handling.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir-flink pull request #8: [BAHIR-73][bahir-flink] flink-streaming-akka so...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/bahir-flink/pull/8#discussion_r89061030
  
    --- Diff: flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/ReceiverActor.java ---
    @@ -0,0 +1,122 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.akka.utils;
    +
    +import akka.actor.ActorRef;
    +import akka.actor.ActorSelection;
    +import akka.actor.UntypedActor;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
    +
    +import java.util.Iterator;
    +
    +/**
    + * Generalized receiver actor which receives messages
    + * from the feeder or publisher actor.
    + */
    +public class ReceiverActor extends UntypedActor {
    +  // --- Fields set by the constructor
    +  private final SourceContext<Object> ctx;
    +
    +  private final String urlOfPublisher;
    +
    +  private final boolean autoAck;
    +
    +  // --- Runtime fields
    +  private ActorSelection remotePublisher;
    +
    +  public ReceiverActor(SourceContext<Object> ctx,
    +            String urlOfPublisher,
    +            boolean autoAck) {
    +    this.ctx = ctx;
    +    this.urlOfPublisher = urlOfPublisher;
    +    this.autoAck = autoAck;
    +  }
    +
    +  @Override
    +  public void preStart() throws Exception {
    +    remotePublisher = getContext().actorSelection(urlOfPublisher);
    +    remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf());
    +  }
    +
    +  @SuppressWarnings("unchecked")
    +  @Override
    +  public void onReceive(Object message)
    +    throws Exception {
    +    if (message instanceof Iterable) {
    +      collect((Iterable<Object>) message);
    +    } else if (message instanceof byte[]) {
    +      byte[] messageBytes = (byte[]) message;
    +      collect(messageBytes);
    +    } else if (message instanceof Tuple2) {
    +      Tuple2<Object, Long> messageTuple = (Tuple2<Object, Long>) message;
    +      collect(messageTuple.f0, messageTuple.f1);
    +    } else {
    +      collect(message);
    +    }
    +
    +    if (autoAck) {
    +      getSender().tell("ack", getSelf());
    +    }
    +  }
    +
    +  /**
    +   * To handle {@link Iterable} data
    +   *
    +   * @param data data received from feeder actor
    +   */
    +  private void collect(Iterable<Object> data) {
    +    Iterator<Object> iterator = data.iterator();
    +    while (iterator.hasNext()) {
    +      ctx.collect(iterator.next());
    +    }
    +  }
    +
    +  /**
    +   * To handle byte array data
    +   *
    +   * @param bytes data received from feeder actor
    +   */
    +  private void collect(byte[] bytes) {
    --- End diff --
    
    What is the purpose of this message? There is no special treatment for byte arrays


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir-flink issue #8: [BAHIR-73][bahir-flink] flink-streaming-akka source co...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/bahir-flink/pull/8
  
    Thank you for opening this pull request.
    I'll review it soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir-flink issue #8: [BAHIR-73][bahir-flink] flink-streaming-akka source co...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/bahir-flink/pull/8
  
    Thank you for addressing the issues so quickly. I found some more issues, once they are addressed we can merge the code.
    
    Did you do a test run on a cluster of the code? It would be interesting to see the throughput you can achieve with the source.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir-flink issue #8: [BAHIR-73][bahir-flink] flink-streaming-akka source co...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/bahir-flink/pull/8
  
    Great, I'm going to merge this PR now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir-flink pull request #8: [BAHIR-73][bahir-flink] flink-streaming-akka so...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/bahir-flink/pull/8#discussion_r89286046
  
    --- Diff: flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.akka;
    +
    +import akka.actor.ActorRef;
    +import akka.actor.ActorSystem;
    +import akka.actor.PoisonPill;
    +import akka.actor.Props;
    +import com.typesafe.config.Config;
    +import com.typesafe.config.ConfigFactory;
    +import org.apache.flink.api.common.functions.StoppableFunction;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.connectors.akka.utils.ReceiverActor;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +import java.util.Properties;
    +
    +/**
    + * Implementation of {@link SourceFunction} specialized to read messages
    + * from Akka actors.
    + */
    +public class AkkaSource extends RichSourceFunction<Object>
    +  implements StoppableFunction {
    +
    +  private static final Logger LOG = LoggerFactory.getLogger(AkkaSource.class);
    +
    +  private static final long serialVersionUID = 1L;
    +
    +  // --- Fields set by the constructor
    +
    +  private final Class<?> classForActor;
    +
    +  private final String actorName;
    +
    +  private final String urlOfPublisher;
    +
    +  // --- Runtime fields
    +  private transient ActorSystem receiverActorSystem;
    +  private transient ActorRef receiverActor;
    +  private transient Object waitLock;
    +  private transient boolean running = true;
    +
    +  protected transient boolean autoAck;
    +
    +  /**
    +   * Creates {@link AkkaSource} for Streaming
    +   *
    +   * @param actorName Receiver Actor name
    +   * @param urlOfPublisher tcp url of the publisher or feeder actor
    +   */
    +  public AkkaSource(String actorName,
    +          String urlOfPublisher) {
    +    super();
    +    this.classForActor = ReceiverActor.class;
    +    this.actorName = actorName;
    +    this.urlOfPublisher = urlOfPublisher;
    +  }
    +
    +  @Override
    +  public void open(Configuration parameters) throws Exception {
    +    Properties customProperties = new Properties();
    +    parameters.addAllToProperties(customProperties);
    +
    +    waitLock = new Object();
    +    receiverActorSystem = createDefaultActorSystem(customProperties);
    +
    +    if (customProperties.containsKey("akka.remote.auto-ack") &&
    +      customProperties.getProperty("akka.remote.auto-ack").equals("on")) {
    +      autoAck = true;
    +    } else {
    +      autoAck = false;
    +    }
    +  }
    +
    +  @Override
    +  public void run(SourceFunction.SourceContext<Object> ctx) throws Exception {
    +    LOG.info("Starting the Receiver actor {}", actorName);
    +    receiverActor = receiverActorSystem.actorOf(
    +      Props.create(classForActor, ctx, urlOfPublisher, autoAck), actorName);
    +
    +    running = true;
    +    LOG.info("Started the Receiver actor {} successfully", actorName);
    +    receiverActorSystem.awaitTermination();
    +
    +    while (running) {
    +      synchronized (waitLock) {
    +        waitLock.wait(100L);
    --- End diff --
    
    I don't think the wait lock is needed anymore when doing `awaitTermination()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir-flink pull request #8: [BAHIR-73][bahir-flink] flink-streaming-akka so...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/bahir-flink/pull/8#discussion_r89286832
  
    --- Diff: flink-connector-akka/README.md ---
    @@ -0,0 +1,45 @@
    +# Flink Akka connector
    +
    +This connector provides a sink to [Akka](http://akka.io/) source actors in an ActorSystem.
    +To use this connector, add the following dependency to your project:
    +
    +
    +    <dependency>
    +      <groupId>org.apache.bahir</groupId>
    +      <artifactId>flink-connector-akka_2.11</artifactId>
    +      <version>1.0.0-SNAPSHOT</version>
    +    </dependency>
    +    
    +*Version Compatibility*: This module is compatible with Akka 2.0+.
    +    
    +## Configuration
    +    
    +The configurations for the Receiver Actor System in Flink Akka connector can be created using the `Configuration (org.apache.flink.configuration.Configuration)` object in Flink.
    +    
    +To enable acknowledgements, the custom configuration `akka.remote.auto-ack` can be used.
    +
    +The user can set any of the default configuration allowed by Akka as well as custom configuration allowed by the connector.
    +   
    +A sample configuration can be defined as follows:
    +    
    +    Configuration configuration = new Configuration();
    +    configuration.setString("akka.loglevel", "INFO");
    +    configuration.setString("akka.actor.provider", "akka.remote.RemoteActorRefProvider");
    +    configuration.setString("akka.remote.netty.tcp.hostname", "127.0.0.1");
    +    configuration.setString("akka.remote.enabled-transports", "[akka.remote.netty.tcp]");
    +    configuration.setString("akka.remote.netty.tcp.port", "5150");
    +    configuration.setString("akka.remote.log-sent-messages", "on");
    +    configuration.setString("akka.remote.log-received-messages", "on");
    +    configuration.setString("akka.remote.auto-ack", "on");    
    --- End diff --
    
    How can a user pass the `configuration` to the Akka source? Afaik its not possible because the open(Configuration c) is not really supported in the DataStream API of Flink.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir-flink issue #8: [BAHIR-73][bahir-flink] flink-streaming-akka source co...

Posted by sbcd90 <gi...@git.apache.org>.
Github user sbcd90 commented on the issue:

    https://github.com/apache/bahir-flink/pull/8
  
    Hello @rmetzger,
    
    Thanks for the code review.
    
    I did the code refactoring to accomodate the following changes:
    
    - waitlock removed
    - configuration moved to constructor
    - the byte[] scenario removed
    
    I did some basic tests in a non-cluster mode using this.
    
    ```
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    AkkaApp app = new AkkaApp();
    
    Config feederActorConfig = ConfigFactory.parseFile(new File(app.getFeederConfigFile()));
    ActorSystem feederActorSystem = ActorSystem.create("feederActorSystem", feederActorConfig);
    feederActorSystem.actorOf(Props.create(FeederActor.class), "feederActor");
    
    Config config = ConfigFactory.parseFile(new File(app.getReceiverConfigFile()));
    String feederActorUrl = "akka.tcp://feederActorSystem@127.0.0.1:5156/user/feederActor";
    
    DataStream<Object> source = env.addSource(new AkkaSource("receiverActor", feederActorUrl, config));
    source.print();
    
    env.execute();
    ```
    
    I would look to do a test run in cluster mode to see the throughput.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---