You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by rm...@apache.org on 2016/11/24 08:52:30 UTC

bahir-flink git commit: [BAHIR-73][bahir-flink] flink-streaming-akka source connector

Repository: bahir-flink
Updated Branches:
  refs/heads/master 2d1225f9a -> a830077a9


[BAHIR-73][bahir-flink] flink-streaming-akka source connector

This closes #8


Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/a830077a
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/a830077a
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/a830077a

Branch: refs/heads/master
Commit: a830077a9b8145d5fb0718619b7055c8d7d1b14e
Parents: 2d1225f
Author: Subhobrata Dey <sb...@gmail.com>
Authored: Tue Oct 25 16:21:07 2016 -0400
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Nov 24 09:51:22 2016 +0100

----------------------------------------------------------------------
 flink-connector-akka/README.md                  |  37 +++
 flink-connector-akka/pom.xml                    |  85 ++++++
 .../streaming/connectors/akka/AkkaSource.java   | 147 +++++++++++
 .../streaming/connectors/akka/package-info.java |  21 ++
 .../connectors/akka/utils/ReceiverActor.java    | 110 ++++++++
 .../akka/utils/SubscribeReceiver.java           |  58 +++++
 .../akka/utils/UnsubscribeReceiver.java         |  58 +++++
 .../connectors/akka/AkkaSourceTest.java         | 256 +++++++++++++++++++
 .../connectors/akka/utils/FeederActor.java      |  99 +++++++
 .../connectors/akka/utils/Message.java          |  25 ++
 .../src/test/resources/feeder_actor.conf        |  33 +++
 .../src/test/resources/log4j.properties         |  27 ++
 pom.xml                                         |   1 +
 13 files changed, 957 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/README.md
----------------------------------------------------------------------
diff --git a/flink-connector-akka/README.md b/flink-connector-akka/README.md
new file mode 100644
index 0000000..b3b64e5
--- /dev/null
+++ b/flink-connector-akka/README.md
@@ -0,0 +1,37 @@
+# Flink Akka connector
+
+This connector provides a sink to [Akka](http://akka.io/) source actors in an ActorSystem.
+To use this connector, add the following dependency to your project:
+
+
+    <dependency>
+      <groupId>org.apache.bahir</groupId>
+      <artifactId>flink-connector-akka_2.11</artifactId>
+      <version>1.0.0-SNAPSHOT</version>
+    </dependency>
+    
+*Version Compatibility*: This module is compatible with Akka 2.0+.
+    
+## Configuration
+    
+The configurations for the Receiver Actor System in Flink Akka connector can be created using the standard typesafe `Config (com.typesafe.config.Config)` object.
+    
+To enable acknowledgements, the custom configuration `akka.remote.auto-ack` can be used.
+
+The user can set any of the default configurations allowed by Akka as well as custom configurations allowed by the connector.
+   
+A sample configuration can be defined as follows:
+    
+    String configFile = getClass().getClassLoader()
+          .getResource("feeder_actor.conf").getFile();
+    Config config = ConfigFactory.parseFile(new File(configFile));    
+    
+## Message Types
+    
+There are 3 different kind of message types which the receiver Actor in flink akka connector can receive.
+    
+- message containing `Iterable<Object>` data
+   
+- message containing generic `Object` data
+   
+- message containing generic `Object` data and a `Timestamp` value passed as `Tuple2<Object, Long>`.   
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connector-akka/pom.xml b/flink-connector-akka/pom.xml
new file mode 100644
index 0000000..df82563
--- /dev/null
+++ b/flink-connector-akka/pom.xml
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>bahir-flink_parent_2.11</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>flink-connector-akka_2.11</artifactId>
+    <name>flink-connector-akka</name>
+    <url>http://bahir.apache.org/</url>
+    <packaging>jar</packaging>
+
+    <properties>
+        <scala.binary.version>2.11</scala.binary.version>
+        <mockito.version>1.10.19</mockito.version>
+        <akka.version>2.3.15</akka.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-actor_${scala.binary.version}</artifactId>
+            <version>${akka.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-remote_${scala.binary.version}</artifactId>
+            <version>${akka.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>${mockito.version}</version>
+            <type>jar</type>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java
----------------------------------------------------------------------
diff --git a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java
new file mode 100644
index 0000000..3925d0b
--- /dev/null
+++ b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.akka;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.akka.utils.ReceiverActor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+
+/**
+ * Implementation of {@link SourceFunction} specialized to read messages
+ * from Akka actors.
+ */
+public class AkkaSource extends RichSourceFunction<Object>
+  implements StoppableFunction {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AkkaSource.class);
+
+  private static final long serialVersionUID = 1L;
+
+  // --- Fields set by the constructor
+
+  private final Class<?> classForActor;
+
+  private final String actorName;
+
+  private final String urlOfPublisher;
+
+  private final Config configuration;
+
+  // --- Runtime fields
+  private transient ActorSystem receiverActorSystem;
+  private transient ActorRef receiverActor;
+
+  protected transient boolean autoAck;
+
+  /**
+   * Creates {@link AkkaSource} for Streaming
+   *
+   * @param actorName Receiver Actor name
+   * @param urlOfPublisher tcp url of the publisher or feeder actor
+   */
+  public AkkaSource(String actorName,
+          String urlOfPublisher,
+          Config configuration) {
+    super();
+    this.classForActor = ReceiverActor.class;
+    this.actorName = actorName;
+    this.urlOfPublisher = urlOfPublisher;
+    this.configuration = configuration;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    receiverActorSystem = createDefaultActorSystem();
+
+    if (configuration.hasPath("akka.remote.auto-ack") &&
+      configuration.getString("akka.remote.auto-ack").equals("on")) {
+      autoAck = true;
+    } else {
+      autoAck = false;
+    }
+  }
+
+  @Override
+  public void run(SourceFunction.SourceContext<Object> ctx) throws Exception {
+    LOG.info("Starting the Receiver actor {}", actorName);
+    receiverActor = receiverActorSystem.actorOf(
+      Props.create(classForActor, ctx, urlOfPublisher, autoAck), actorName);
+
+    LOG.info("Started the Receiver actor {} successfully", actorName);
+    receiverActorSystem.awaitTermination();
+  }
+
+  @Override
+  public void close() {
+    LOG.info("Closing source");
+    if (receiverActorSystem != null) {
+      receiverActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+      receiverActorSystem.shutdown();
+    }
+  }
+
+  @Override
+  public void cancel() {
+    LOG.info("Cancelling akka source");
+    close();
+  }
+
+  @Override
+  public void stop() {
+    LOG.info("Stopping akka source");
+    close();
+  }
+
+  /**
+   * Creates an actor system with default configurations for Receiver actor.
+   *
+   * @return Actor System instance with default configurations
+   */
+  private ActorSystem createDefaultActorSystem() {
+    String defaultActorSystemName = "receiver-actor-system";
+
+    Config finalConfig = getOrCreateMandatoryProperties(configuration);
+
+    return ActorSystem.create(defaultActorSystemName, finalConfig);
+  }
+
+  private Config getOrCreateMandatoryProperties(Config properties) {
+    if (!properties.hasPath("akka.actor.provider")) {
+      properties = properties.withValue("akka.actor.provider",
+        ConfigValueFactory.fromAnyRef("akka.remote.RemoteActorRefProvider"));
+    }
+
+    if (!properties.hasPath("akka.remote.enabled-transports")) {
+      properties = properties.withValue("akka.remote.enabled-transports",
+        ConfigValueFactory.fromAnyRef(Collections.singletonList("akka.remote.netty.tcp")));
+    }
+    return properties;
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/package-info.java
----------------------------------------------------------------------
diff --git a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/package-info.java b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/package-info.java
new file mode 100644
index 0000000..ecea4c4
--- /dev/null
+++ b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Akka receiver for Flink Streaming.
+ */
+package org.apache.flink.streaming.connectors.akka;

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/ReceiverActor.java
----------------------------------------------------------------------
diff --git a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/ReceiverActor.java b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/ReceiverActor.java
new file mode 100644
index 0000000..09913a0
--- /dev/null
+++ b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/ReceiverActor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.akka.utils;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.UntypedActor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+
+import java.util.Iterator;
+
+/**
+ * Generalized receiver actor which receives messages
+ * from the feeder or publisher actor.
+ */
+public class ReceiverActor extends UntypedActor {
+  // --- Fields set by the constructor
+  private final SourceContext<Object> ctx;
+
+  private final String urlOfPublisher;
+
+  private final boolean autoAck;
+
+  // --- Runtime fields
+  private ActorSelection remotePublisher;
+
+  public ReceiverActor(SourceContext<Object> ctx,
+            String urlOfPublisher,
+            boolean autoAck) {
+    this.ctx = ctx;
+    this.urlOfPublisher = urlOfPublisher;
+    this.autoAck = autoAck;
+  }
+
+  @Override
+  public void preStart() throws Exception {
+    remotePublisher = getContext().actorSelection(urlOfPublisher);
+    remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void onReceive(Object message)
+    throws Exception {
+    if (message instanceof Iterable) {
+      collect((Iterable<Object>) message);
+    } else if (message instanceof Tuple2) {
+      Tuple2<Object, Long> messageTuple = (Tuple2<Object, Long>) message;
+      collect(messageTuple.f0, messageTuple.f1);
+    } else {
+      collect(message);
+    }
+
+    if (autoAck) {
+      getSender().tell("ack", getSelf());
+    }
+  }
+
+  /**
+   * To handle {@link Iterable} data
+   *
+   * @param data data received from feeder actor
+   */
+  private void collect(Iterable<Object> data) {
+    Iterator<Object> iterator = data.iterator();
+    while (iterator.hasNext()) {
+      ctx.collect(iterator.next());
+    }
+  }
+
+  /**
+   * To handle single data
+   * @param data data received from feeder actor
+   */
+  private void collect(Object data) {
+    ctx.collect(data);
+  }
+
+  /**
+   * To handle data with timestamp
+   *
+   * @param data data received from feeder actor
+   * @param timestamp timestamp received from feeder actor
+   */
+  private void collect(Object data, long timestamp) {
+    ctx.collectWithTimestamp(data, timestamp);
+  }
+
+  @Override
+  public void postStop() throws Exception {
+    remotePublisher.tell(new UnsubscribeReceiver(ActorRef.noSender()),
+      ActorRef.noSender());
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/SubscribeReceiver.java
----------------------------------------------------------------------
diff --git a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/SubscribeReceiver.java b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/SubscribeReceiver.java
new file mode 100644
index 0000000..735e3e9
--- /dev/null
+++ b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/SubscribeReceiver.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.akka.utils;
+
+import akka.actor.ActorRef;
+
+import java.io.Serializable;
+
+/**
+ * General interface used by Receiver Actor to subscribe
+ * to the publisher.
+ */
+public class SubscribeReceiver implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private ActorRef receiverActor;
+
+  public SubscribeReceiver(ActorRef receiverActor) {
+    this.receiverActor = receiverActor;
+  }
+
+  public void setReceiverActor(ActorRef receiverActor) {
+    this.receiverActor = receiverActor;
+  }
+
+  public ActorRef getReceiverActor() {
+    return receiverActor;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof SubscribeReceiver) {
+      SubscribeReceiver other = (SubscribeReceiver) obj;
+      return other.canEquals(this) && super.equals(other)
+        && receiverActor.equals(other.getReceiverActor());
+    } else {
+      return false;
+    }
+  }
+
+  public boolean canEquals(Object obj) {
+    return obj instanceof SubscribeReceiver;
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/UnsubscribeReceiver.java
----------------------------------------------------------------------
diff --git a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/UnsubscribeReceiver.java b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/UnsubscribeReceiver.java
new file mode 100644
index 0000000..8c6dde2
--- /dev/null
+++ b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/UnsubscribeReceiver.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.akka.utils;
+
+import akka.actor.ActorRef;
+
+import java.io.Serializable;
+
+/**
+ * General interface used by Receiver Actor to un subscribe.
+ */
+public class UnsubscribeReceiver implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private ActorRef receiverActor;
+
+  public UnsubscribeReceiver(ActorRef receiverActor) {
+    this.receiverActor = receiverActor;
+  }
+
+  public void setReceiverActor(ActorRef receiverActor) {
+    this.receiverActor = receiverActor;
+  }
+
+  public ActorRef getReceiverActor() {
+    return receiverActor;
+  }
+
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof UnsubscribeReceiver) {
+      UnsubscribeReceiver other = (UnsubscribeReceiver) obj;
+      return other.canEquals(this) && super.equals(other)
+        && receiverActor.equals(other.getReceiverActor());
+    } else {
+      return false;
+    }
+  }
+
+  public boolean canEquals(Object obj) {
+    return obj instanceof UnsubscribeReceiver;
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java b/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java
new file mode 100644
index 0000000..e7114d7
--- /dev/null
+++ b/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.akka;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.akka.utils.FeederActor;
+import org.apache.flink.streaming.connectors.akka.utils.Message;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+public class AkkaSourceTest {
+  private AkkaSource source;
+
+  private static final String feederActorName = "JavaFeederActor";
+  private static final String receiverActorName = "receiverActor";
+  private static final String urlOfFeeder =
+    "akka.tcp://feederActorSystem@127.0.0.1:5150/user/" + feederActorName;
+  private ActorSystem feederActorSystem;
+
+  private Configuration config = new Configuration();
+  private Config sourceConfiguration = ConfigFactory.empty();
+
+  private Thread sourceThread;
+
+  private SourceFunction.SourceContext<Object> sourceContext;
+
+  private volatile Exception exception;
+
+  @Before
+  public void beforeTest() throws Exception {
+    feederActorSystem = ActorSystem.create("feederActorSystem",
+      getFeederActorConfig());
+
+    sourceContext = new DummySourceContext();
+
+    sourceThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          SourceFunction.SourceContext<Object> sourceContext =
+            new DummySourceContext();
+          source.run(sourceContext);
+        } catch (Exception e) {
+          exception = e;
+        }
+      }
+    });
+  }
+
+  @After
+  public void afterTest() throws Exception {
+    feederActorSystem.shutdown();
+    feederActorSystem.awaitTermination();
+
+    source.cancel();
+    sourceThread.join();
+  }
+
+  @Test
+  public void testWithSingleData() throws Exception {
+    source = new AkkaTestSource(sourceConfiguration);
+
+    feederActorSystem.actorOf(
+      Props.create(FeederActor.class, FeederActor.MessageTypes.SINGLE_DATA),
+      feederActorName);
+
+    source.autoAck = false;
+    source.open(config);
+    sourceThread.start();
+
+    while (DummySourceContext.numElementsCollected != 1) {
+      Thread.sleep(5);
+    }
+    List<Object> message = DummySourceContext.message;
+    Assert.assertEquals(message.get(0).toString(), Message.WELCOME_MESSAGE);
+  }
+
+  @Test
+  public void testWithIterableData() throws Exception {
+    source = new AkkaTestSource(sourceConfiguration);
+
+    feederActorSystem.actorOf(
+      Props.create(FeederActor.class, FeederActor.MessageTypes.ITERABLE_DATA),
+      feederActorName);
+
+    source.autoAck = false;
+    source.open(config);
+    sourceThread.start();
+
+    while (DummySourceContext.numElementsCollected != 2) {
+      Thread.sleep(5);
+    }
+
+    List<Object> messages = DummySourceContext.message;
+    Assert.assertEquals(messages.get(0).toString(), Message.WELCOME_MESSAGE);
+    Assert.assertEquals(messages.get(1).toString(), Message.FEEDER_MESSAGE);
+  }
+
+  @Test
+  public void testWithByteArrayData() throws Exception {
+    source = new AkkaTestSource(sourceConfiguration);
+
+    feederActorSystem.actorOf(
+      Props.create(FeederActor.class, FeederActor.MessageTypes.BYTES_DATA),
+      feederActorName);
+
+    source.autoAck = false;
+    source.open(config);
+    sourceThread.start();
+
+    while (DummySourceContext.numElementsCollected != 1) {
+      Thread.sleep(5);
+    }
+
+    List<Object> message = DummySourceContext.message;
+    if (message.get(0) instanceof byte[]) {
+      byte[] data = (byte[]) message.get(0);
+      Assert.assertEquals(new String(data), Message.WELCOME_MESSAGE);
+    }
+  }
+
+  @Test
+  public void testWithSingleDataWithTimestamp() throws Exception {
+    source = new AkkaTestSource(sourceConfiguration);
+
+    feederActorSystem.actorOf(
+      Props.create(FeederActor.class, FeederActor.MessageTypes.SINGLE_DATA_WITH_TIMESTAMP),
+      feederActorName);
+
+    source.autoAck = false;
+    source.open(config);
+    sourceThread.start();
+
+    while (DummySourceContext.numElementsCollected != 1) {
+      Thread.sleep(5);
+    }
+
+    List<Object> message = DummySourceContext.message;
+    Assert.assertEquals(message.get(0).toString(), Message.WELCOME_MESSAGE);
+  }
+
+  @Test
+  public void testAcksWithSingleData() throws Exception {
+    sourceConfiguration = sourceConfiguration.withValue("akka.remote.auto-ack",
+      ConfigValueFactory.fromAnyRef("on"));
+    source = new AkkaTestSource(sourceConfiguration);
+
+    feederActorSystem.actorOf(
+      Props.create(FeederActor.class, FeederActor.MessageTypes.SINGLE_DATA),
+      feederActorName);
+
+    source.open(config);
+    sourceThread.start();
+
+    while (DummySourceContext.numElementsCollected != 1) {
+      Thread.sleep(5);
+    }
+
+    int noOfRetries = 1;
+    while (Message.ACK_MESSAGE == null && noOfRetries <= 5) {
+      Thread.sleep(5);
+      noOfRetries++;
+    }
+    Assert.assertEquals("ack", Message.ACK_MESSAGE);
+  }
+
+  private class AkkaTestSource extends AkkaSource {
+
+    private AkkaTestSource(Config sourceConfig) {
+      super(receiverActorName, urlOfFeeder, sourceConfig);
+    }
+
+    @Override
+    public RuntimeContext getRuntimeContext() {
+      return Mockito.mock(StreamingRuntimeContext.class);
+    }
+  }
+
+  private static class DummySourceContext implements SourceFunction.SourceContext<Object> {
+    private static final Object lock = new Object();
+
+    private static long numElementsCollected;
+
+    private static List<Object> message;
+
+    private DummySourceContext() {
+      numElementsCollected = 0;
+      message = new ArrayList<Object>();
+    }
+
+    @Override
+    public void collect(Object element) {
+      message.add(element);
+      numElementsCollected++;
+    }
+
+    @Override
+    public void collectWithTimestamp(Object element, long timestamp) {
+      message.add(element);
+      numElementsCollected++;
+    }
+
+    @Override
+    public void emitWatermark(Watermark mark) {
+
+    }
+
+    @Override
+    public Object getCheckpointLock() {
+      return lock;
+    }
+
+    @Override
+    public void close() {
+
+    }
+  }
+
+  private Config getFeederActorConfig() {
+    String configFile = getClass().getClassLoader()
+      .getResource("feeder_actor.conf").getFile();
+    Config config = ConfigFactory.parseFile(new File(configFile));
+    return config;
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/FeederActor.java
----------------------------------------------------------------------
diff --git a/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/FeederActor.java b/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/FeederActor.java
new file mode 100644
index 0000000..40d4694
--- /dev/null
+++ b/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/FeederActor.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.akka.utils;
+
+import akka.actor.ActorRef;
+import akka.actor.UntypedActor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FeederActor extends UntypedActor {
+
+  public enum MessageTypes {
+    SINGLE_DATA, ITERABLE_DATA, BYTES_DATA,
+    SINGLE_DATA_WITH_TIMESTAMP
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(FeederActor.class);
+
+  private final MessageTypes messageType;
+
+  public FeederActor(MessageTypes messageType) {
+    this.messageType = messageType;
+  }
+
+  @Override
+  public void onReceive(Object message) {
+    if (message instanceof SubscribeReceiver) {
+      ActorRef receiver = ((SubscribeReceiver) message).getReceiverActor();
+
+      Object data;
+      switch (messageType) {
+        case SINGLE_DATA:
+          data = createSingleDataMessage();
+          break;
+        case ITERABLE_DATA:
+          data = createIterableOfMessages();
+          break;
+        case BYTES_DATA:
+          data = createByteMessages();
+          break;
+        case SINGLE_DATA_WITH_TIMESTAMP:
+          data = createTimestampMessage();
+          break;
+        default:
+          throw new RuntimeException("Message format specified is incorrect");
+      }
+      receiver.tell(data, getSelf());
+    } else if (message instanceof String) {
+      Message.ACK_MESSAGE = message.toString();
+    } else if (message instanceof UnsubscribeReceiver) {
+      LOG.info("Stop actor!");
+    }
+  }
+
+  private Object createSingleDataMessage() {
+    return Message.WELCOME_MESSAGE;
+  }
+
+  private List<Object> createIterableOfMessages() {
+    List<Object> messages = new ArrayList<Object>();
+
+    messages.add(Message.WELCOME_MESSAGE);
+    messages.add(Message.FEEDER_MESSAGE);
+
+    return messages;
+  }
+
+  private byte[] createByteMessages() {
+    byte[] message = Message.WELCOME_MESSAGE.getBytes();
+    return message;
+  }
+
+  private Tuple2<Object, Long> createTimestampMessage() {
+    Tuple2<Object, Long> message = new Tuple2<Object, Long>();
+    message.f0 = Message.WELCOME_MESSAGE;
+    message.f1 = System.currentTimeMillis();
+
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/Message.java
----------------------------------------------------------------------
diff --git a/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/Message.java b/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/Message.java
new file mode 100644
index 0000000..6f70467
--- /dev/null
+++ b/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/Message.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.akka.utils;
+
+public class Message {
+  public static final String WELCOME_MESSAGE = "welcome receiver";
+  public static final String FEEDER_MESSAGE = "this is feeder";
+
+  public static String ACK_MESSAGE;
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/test/resources/feeder_actor.conf
----------------------------------------------------------------------
diff --git a/flink-connector-akka/src/test/resources/feeder_actor.conf b/flink-connector-akka/src/test/resources/feeder_actor.conf
new file mode 100644
index 0000000..a877aa3
--- /dev/null
+++ b/flink-connector-akka/src/test/resources/feeder_actor.conf
@@ -0,0 +1,33 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+akka {
+  loglevel = "INFO"
+  actor {
+    provider = "akka.remote.RemoteActorRefProvider"
+  }
+  remote {
+    enabled-transports = ["akka.remote.netty.tcp"]
+    netty.tcp {
+      hostname = 127.0.0.1
+      port = 5150
+    }
+    log-sent-messages = on
+    log-received-messages = on
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-connector-akka/src/test/resources/log4j.properties b/flink-connector-akka/src/test/resources/log4j.properties
new file mode 100644
index 0000000..c82c2c7
--- /dev/null
+++ b/flink-connector-akka/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This file ensures that tests executed from the IDE show log output
+
+log4j.rootLogger=INFO, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ff6da2b..9a15ff4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,7 @@
     <module>flink-connector-flume</module>
     <module>flink-connector-activemq</module>
     <module>flink-connector-netty</module>
+    <module>flink-connector-akka</module>
   </modules>
 
   <properties>