You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by rm...@apache.org on 2016/11/24 08:52:30 UTC
bahir-flink git commit: [BAHIR-73][bahir-flink] flink-streaming-akka
source connector
Repository: bahir-flink
Updated Branches:
refs/heads/master 2d1225f9a -> a830077a9
[BAHIR-73][bahir-flink] flink-streaming-akka source connector
This closes #8
Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/a830077a
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/a830077a
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/a830077a
Branch: refs/heads/master
Commit: a830077a9b8145d5fb0718619b7055c8d7d1b14e
Parents: 2d1225f
Author: Subhobrata Dey <sb...@gmail.com>
Authored: Tue Oct 25 16:21:07 2016 -0400
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Nov 24 09:51:22 2016 +0100
----------------------------------------------------------------------
flink-connector-akka/README.md | 37 +++
flink-connector-akka/pom.xml | 85 ++++++
.../streaming/connectors/akka/AkkaSource.java | 147 +++++++++++
.../streaming/connectors/akka/package-info.java | 21 ++
.../connectors/akka/utils/ReceiverActor.java | 110 ++++++++
.../akka/utils/SubscribeReceiver.java | 58 +++++
.../akka/utils/UnsubscribeReceiver.java | 58 +++++
.../connectors/akka/AkkaSourceTest.java | 256 +++++++++++++++++++
.../connectors/akka/utils/FeederActor.java | 99 +++++++
.../connectors/akka/utils/Message.java | 25 ++
.../src/test/resources/feeder_actor.conf | 33 +++
.../src/test/resources/log4j.properties | 27 ++
pom.xml | 1 +
13 files changed, 957 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/README.md
----------------------------------------------------------------------
diff --git a/flink-connector-akka/README.md b/flink-connector-akka/README.md
new file mode 100644
index 0000000..b3b64e5
--- /dev/null
+++ b/flink-connector-akka/README.md
@@ -0,0 +1,37 @@
+# Flink Akka connector
+
+This connector provides a sink to [Akka](http://akka.io/) source actors in an ActorSystem.
+To use this connector, add the following dependency to your project:
+
+
+ <dependency>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>flink-connector-akka_2.11</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </dependency>
+
+*Version Compatibility*: This module is compatible with Akka 2.0+.
+
+## Configuration
+
+The configurations for the Receiver Actor System in Flink Akka connector can be created using the standard typesafe `Config (com.typesafe.config.Config)` object.
+
+To enable acknowledgements, the custom configuration `akka.remote.auto-ack` can be used.
+
+The user can set any of the default configurations allowed by Akka as well as custom configurations allowed by the connector.
+
+A sample configuration can be defined as follows:
+
+ String configFile = getClass().getClassLoader()
+ .getResource("feeder_actor.conf").getFile();
+ Config config = ConfigFactory.parseFile(new File(configFile));
+
+## Message Types
+
+There are 3 different kind of message types which the receiver Actor in flink akka connector can receive.
+
+- message containing `Iterable<Object>` data
+
+- message containing generic `Object` data
+
+- message containing generic `Object` data and a `Timestamp` value passed as `Tuple2<Object, Long>`.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connector-akka/pom.xml b/flink-connector-akka/pom.xml
new file mode 100644
index 0000000..df82563
--- /dev/null
+++ b/flink-connector-akka/pom.xml
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>bahir-flink_parent_2.11</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-akka_2.11</artifactId>
+ <name>flink-connector-akka</name>
+ <url>http://bahir.apache.org/</url>
+ <packaging>jar</packaging>
+
+ <properties>
+ <scala.binary.version>2.11</scala.binary.version>
+ <mockito.version>1.10.19</mockito.version>
+ <akka.version>2.3.15</akka.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-actor_${scala.binary.version}</artifactId>
+ <version>${akka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-remote_${scala.binary.version}</artifactId>
+ <version>${akka.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>${mockito.version}</version>
+ <type>jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java
----------------------------------------------------------------------
diff --git a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java
new file mode 100644
index 0000000..3925d0b
--- /dev/null
+++ b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/AkkaSource.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.akka;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.akka.utils.ReceiverActor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+
+/**
+ * Implementation of {@link SourceFunction} specialized to read messages
+ * from Akka actors.
+ */
+public class AkkaSource extends RichSourceFunction<Object>
+ implements StoppableFunction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AkkaSource.class);
+
+ private static final long serialVersionUID = 1L;
+
+ // --- Fields set by the constructor
+
+ private final Class<?> classForActor;
+
+ private final String actorName;
+
+ private final String urlOfPublisher;
+
+ private final Config configuration;
+
+ // --- Runtime fields
+ private transient ActorSystem receiverActorSystem;
+ private transient ActorRef receiverActor;
+
+ protected transient boolean autoAck;
+
+ /**
+ * Creates {@link AkkaSource} for Streaming
+ *
+ * @param actorName Receiver Actor name
+ * @param urlOfPublisher tcp url of the publisher or feeder actor
+ */
+ public AkkaSource(String actorName,
+ String urlOfPublisher,
+ Config configuration) {
+ super();
+ this.classForActor = ReceiverActor.class;
+ this.actorName = actorName;
+ this.urlOfPublisher = urlOfPublisher;
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ receiverActorSystem = createDefaultActorSystem();
+
+ if (configuration.hasPath("akka.remote.auto-ack") &&
+ configuration.getString("akka.remote.auto-ack").equals("on")) {
+ autoAck = true;
+ } else {
+ autoAck = false;
+ }
+ }
+
+ @Override
+ public void run(SourceFunction.SourceContext<Object> ctx) throws Exception {
+ LOG.info("Starting the Receiver actor {}", actorName);
+ receiverActor = receiverActorSystem.actorOf(
+ Props.create(classForActor, ctx, urlOfPublisher, autoAck), actorName);
+
+ LOG.info("Started the Receiver actor {} successfully", actorName);
+ receiverActorSystem.awaitTermination();
+ }
+
+ @Override
+ public void close() {
+ LOG.info("Closing source");
+ if (receiverActorSystem != null) {
+ receiverActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ receiverActorSystem.shutdown();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ LOG.info("Cancelling akka source");
+ close();
+ }
+
+ @Override
+ public void stop() {
+ LOG.info("Stopping akka source");
+ close();
+ }
+
+ /**
+ * Creates an actor system with default configurations for Receiver actor.
+ *
+ * @return Actor System instance with default configurations
+ */
+ private ActorSystem createDefaultActorSystem() {
+ String defaultActorSystemName = "receiver-actor-system";
+
+ Config finalConfig = getOrCreateMandatoryProperties(configuration);
+
+ return ActorSystem.create(defaultActorSystemName, finalConfig);
+ }
+
+ private Config getOrCreateMandatoryProperties(Config properties) {
+ if (!properties.hasPath("akka.actor.provider")) {
+ properties = properties.withValue("akka.actor.provider",
+ ConfigValueFactory.fromAnyRef("akka.remote.RemoteActorRefProvider"));
+ }
+
+ if (!properties.hasPath("akka.remote.enabled-transports")) {
+ properties = properties.withValue("akka.remote.enabled-transports",
+ ConfigValueFactory.fromAnyRef(Collections.singletonList("akka.remote.netty.tcp")));
+ }
+ return properties;
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/package-info.java
----------------------------------------------------------------------
diff --git a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/package-info.java b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/package-info.java
new file mode 100644
index 0000000..ecea4c4
--- /dev/null
+++ b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Akka receiver for Flink Streaming.
+ */
+package org.apache.flink.streaming.connectors.akka;
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/ReceiverActor.java
----------------------------------------------------------------------
diff --git a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/ReceiverActor.java b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/ReceiverActor.java
new file mode 100644
index 0000000..09913a0
--- /dev/null
+++ b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/ReceiverActor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.akka.utils;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.UntypedActor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+
+import java.util.Iterator;
+
+/**
+ * Generalized receiver actor which receives messages
+ * from the feeder or publisher actor.
+ */
+public class ReceiverActor extends UntypedActor {
+ // --- Fields set by the constructor
+ private final SourceContext<Object> ctx;
+
+ private final String urlOfPublisher;
+
+ private final boolean autoAck;
+
+ // --- Runtime fields
+ private ActorSelection remotePublisher;
+
+ public ReceiverActor(SourceContext<Object> ctx,
+ String urlOfPublisher,
+ boolean autoAck) {
+ this.ctx = ctx;
+ this.urlOfPublisher = urlOfPublisher;
+ this.autoAck = autoAck;
+ }
+
+ @Override
+ public void preStart() throws Exception {
+ remotePublisher = getContext().actorSelection(urlOfPublisher);
+ remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onReceive(Object message)
+ throws Exception {
+ if (message instanceof Iterable) {
+ collect((Iterable<Object>) message);
+ } else if (message instanceof Tuple2) {
+ Tuple2<Object, Long> messageTuple = (Tuple2<Object, Long>) message;
+ collect(messageTuple.f0, messageTuple.f1);
+ } else {
+ collect(message);
+ }
+
+ if (autoAck) {
+ getSender().tell("ack", getSelf());
+ }
+ }
+
+ /**
+ * To handle {@link Iterable} data
+ *
+ * @param data data received from feeder actor
+ */
+ private void collect(Iterable<Object> data) {
+ Iterator<Object> iterator = data.iterator();
+ while (iterator.hasNext()) {
+ ctx.collect(iterator.next());
+ }
+ }
+
+ /**
+ * To handle single data
+ * @param data data received from feeder actor
+ */
+ private void collect(Object data) {
+ ctx.collect(data);
+ }
+
+ /**
+ * To handle data with timestamp
+ *
+ * @param data data received from feeder actor
+ * @param timestamp timestamp received from feeder actor
+ */
+ private void collect(Object data, long timestamp) {
+ ctx.collectWithTimestamp(data, timestamp);
+ }
+
+ @Override
+ public void postStop() throws Exception {
+ remotePublisher.tell(new UnsubscribeReceiver(ActorRef.noSender()),
+ ActorRef.noSender());
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/SubscribeReceiver.java
----------------------------------------------------------------------
diff --git a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/SubscribeReceiver.java b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/SubscribeReceiver.java
new file mode 100644
index 0000000..735e3e9
--- /dev/null
+++ b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/SubscribeReceiver.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.akka.utils;
+
+import akka.actor.ActorRef;
+
+import java.io.Serializable;
+
+/**
+ * General interface used by Receiver Actor to subscribe
+ * to the publisher.
+ */
+public class SubscribeReceiver implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private ActorRef receiverActor;
+
+ public SubscribeReceiver(ActorRef receiverActor) {
+ this.receiverActor = receiverActor;
+ }
+
+ public void setReceiverActor(ActorRef receiverActor) {
+ this.receiverActor = receiverActor;
+ }
+
+ public ActorRef getReceiverActor() {
+ return receiverActor;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof SubscribeReceiver) {
+ SubscribeReceiver other = (SubscribeReceiver) obj;
+ return other.canEquals(this) && super.equals(other)
+ && receiverActor.equals(other.getReceiverActor());
+ } else {
+ return false;
+ }
+ }
+
+ public boolean canEquals(Object obj) {
+ return obj instanceof SubscribeReceiver;
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/UnsubscribeReceiver.java
----------------------------------------------------------------------
diff --git a/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/UnsubscribeReceiver.java b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/UnsubscribeReceiver.java
new file mode 100644
index 0000000..8c6dde2
--- /dev/null
+++ b/flink-connector-akka/src/main/java/org/apache/flink/streaming/connectors/akka/utils/UnsubscribeReceiver.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.akka.utils;
+
+import akka.actor.ActorRef;
+
+import java.io.Serializable;
+
+/**
+ * General interface used by Receiver Actor to un subscribe.
+ */
+public class UnsubscribeReceiver implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private ActorRef receiverActor;
+
+ public UnsubscribeReceiver(ActorRef receiverActor) {
+ this.receiverActor = receiverActor;
+ }
+
+ public void setReceiverActor(ActorRef receiverActor) {
+ this.receiverActor = receiverActor;
+ }
+
+ public ActorRef getReceiverActor() {
+ return receiverActor;
+ }
+
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof UnsubscribeReceiver) {
+ UnsubscribeReceiver other = (UnsubscribeReceiver) obj;
+ return other.canEquals(this) && super.equals(other)
+ && receiverActor.equals(other.getReceiverActor());
+ } else {
+ return false;
+ }
+ }
+
+ public boolean canEquals(Object obj) {
+ return obj instanceof UnsubscribeReceiver;
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java b/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java
new file mode 100644
index 0000000..e7114d7
--- /dev/null
+++ b/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/AkkaSourceTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.akka;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.akka.utils.FeederActor;
+import org.apache.flink.streaming.connectors.akka.utils.Message;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+public class AkkaSourceTest {
+ private AkkaSource source;
+
+ private static final String feederActorName = "JavaFeederActor";
+ private static final String receiverActorName = "receiverActor";
+ private static final String urlOfFeeder =
+ "akka.tcp://feederActorSystem@127.0.0.1:5150/user/" + feederActorName;
+ private ActorSystem feederActorSystem;
+
+ private Configuration config = new Configuration();
+ private Config sourceConfiguration = ConfigFactory.empty();
+
+ private Thread sourceThread;
+
+ private SourceFunction.SourceContext<Object> sourceContext;
+
+ private volatile Exception exception;
+
+ @Before
+ public void beforeTest() throws Exception {
+ feederActorSystem = ActorSystem.create("feederActorSystem",
+ getFeederActorConfig());
+
+ sourceContext = new DummySourceContext();
+
+ sourceThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ SourceFunction.SourceContext<Object> sourceContext =
+ new DummySourceContext();
+ source.run(sourceContext);
+ } catch (Exception e) {
+ exception = e;
+ }
+ }
+ });
+ }
+
+ @After
+ public void afterTest() throws Exception {
+ feederActorSystem.shutdown();
+ feederActorSystem.awaitTermination();
+
+ source.cancel();
+ sourceThread.join();
+ }
+
+ @Test
+ public void testWithSingleData() throws Exception {
+ source = new AkkaTestSource(sourceConfiguration);
+
+ feederActorSystem.actorOf(
+ Props.create(FeederActor.class, FeederActor.MessageTypes.SINGLE_DATA),
+ feederActorName);
+
+ source.autoAck = false;
+ source.open(config);
+ sourceThread.start();
+
+ while (DummySourceContext.numElementsCollected != 1) {
+ Thread.sleep(5);
+ }
+ List<Object> message = DummySourceContext.message;
+ Assert.assertEquals(message.get(0).toString(), Message.WELCOME_MESSAGE);
+ }
+
+ @Test
+ public void testWithIterableData() throws Exception {
+ source = new AkkaTestSource(sourceConfiguration);
+
+ feederActorSystem.actorOf(
+ Props.create(FeederActor.class, FeederActor.MessageTypes.ITERABLE_DATA),
+ feederActorName);
+
+ source.autoAck = false;
+ source.open(config);
+ sourceThread.start();
+
+ while (DummySourceContext.numElementsCollected != 2) {
+ Thread.sleep(5);
+ }
+
+ List<Object> messages = DummySourceContext.message;
+ Assert.assertEquals(messages.get(0).toString(), Message.WELCOME_MESSAGE);
+ Assert.assertEquals(messages.get(1).toString(), Message.FEEDER_MESSAGE);
+ }
+
+ @Test
+ public void testWithByteArrayData() throws Exception {
+ source = new AkkaTestSource(sourceConfiguration);
+
+ feederActorSystem.actorOf(
+ Props.create(FeederActor.class, FeederActor.MessageTypes.BYTES_DATA),
+ feederActorName);
+
+ source.autoAck = false;
+ source.open(config);
+ sourceThread.start();
+
+ while (DummySourceContext.numElementsCollected != 1) {
+ Thread.sleep(5);
+ }
+
+ List<Object> message = DummySourceContext.message;
+ if (message.get(0) instanceof byte[]) {
+ byte[] data = (byte[]) message.get(0);
+ Assert.assertEquals(new String(data), Message.WELCOME_MESSAGE);
+ }
+ }
+
+ @Test
+ public void testWithSingleDataWithTimestamp() throws Exception {
+ source = new AkkaTestSource(sourceConfiguration);
+
+ feederActorSystem.actorOf(
+ Props.create(FeederActor.class, FeederActor.MessageTypes.SINGLE_DATA_WITH_TIMESTAMP),
+ feederActorName);
+
+ source.autoAck = false;
+ source.open(config);
+ sourceThread.start();
+
+ while (DummySourceContext.numElementsCollected != 1) {
+ Thread.sleep(5);
+ }
+
+ List<Object> message = DummySourceContext.message;
+ Assert.assertEquals(message.get(0).toString(), Message.WELCOME_MESSAGE);
+ }
+
+ @Test
+ public void testAcksWithSingleData() throws Exception {
+ sourceConfiguration = sourceConfiguration.withValue("akka.remote.auto-ack",
+ ConfigValueFactory.fromAnyRef("on"));
+ source = new AkkaTestSource(sourceConfiguration);
+
+ feederActorSystem.actorOf(
+ Props.create(FeederActor.class, FeederActor.MessageTypes.SINGLE_DATA),
+ feederActorName);
+
+ source.open(config);
+ sourceThread.start();
+
+ while (DummySourceContext.numElementsCollected != 1) {
+ Thread.sleep(5);
+ }
+
+ int noOfRetries = 1;
+ while (Message.ACK_MESSAGE == null && noOfRetries <= 5) {
+ Thread.sleep(5);
+ noOfRetries++;
+ }
+ Assert.assertEquals("ack", Message.ACK_MESSAGE);
+ }
+
+ private class AkkaTestSource extends AkkaSource {
+
+ private AkkaTestSource(Config sourceConfig) {
+ super(receiverActorName, urlOfFeeder, sourceConfig);
+ }
+
+ @Override
+ public RuntimeContext getRuntimeContext() {
+ return Mockito.mock(StreamingRuntimeContext.class);
+ }
+ }
+
+ private static class DummySourceContext implements SourceFunction.SourceContext<Object> {
+ private static final Object lock = new Object();
+
+ private static long numElementsCollected;
+
+ private static List<Object> message;
+
+ private DummySourceContext() {
+ numElementsCollected = 0;
+ message = new ArrayList<Object>();
+ }
+
+ @Override
+ public void collect(Object element) {
+ message.add(element);
+ numElementsCollected++;
+ }
+
+ @Override
+ public void collectWithTimestamp(Object element, long timestamp) {
+ message.add(element);
+ numElementsCollected++;
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return lock;
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
+
+ private Config getFeederActorConfig() {
+ String configFile = getClass().getClassLoader()
+ .getResource("feeder_actor.conf").getFile();
+ Config config = ConfigFactory.parseFile(new File(configFile));
+ return config;
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/FeederActor.java
----------------------------------------------------------------------
diff --git a/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/FeederActor.java b/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/FeederActor.java
new file mode 100644
index 0000000..40d4694
--- /dev/null
+++ b/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/FeederActor.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.akka.utils;
+
+import akka.actor.ActorRef;
+import akka.actor.UntypedActor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FeederActor extends UntypedActor {
+
+ public enum MessageTypes {
+ SINGLE_DATA, ITERABLE_DATA, BYTES_DATA,
+ SINGLE_DATA_WITH_TIMESTAMP
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(FeederActor.class);
+
+ private final MessageTypes messageType;
+
+ public FeederActor(MessageTypes messageType) {
+ this.messageType = messageType;
+ }
+
+ @Override
+ public void onReceive(Object message) {
+ if (message instanceof SubscribeReceiver) {
+ ActorRef receiver = ((SubscribeReceiver) message).getReceiverActor();
+
+ Object data;
+ switch (messageType) {
+ case SINGLE_DATA:
+ data = createSingleDataMessage();
+ break;
+ case ITERABLE_DATA:
+ data = createIterableOfMessages();
+ break;
+ case BYTES_DATA:
+ data = createByteMessages();
+ break;
+ case SINGLE_DATA_WITH_TIMESTAMP:
+ data = createTimestampMessage();
+ break;
+ default:
+ throw new RuntimeException("Message format specified is incorrect");
+ }
+ receiver.tell(data, getSelf());
+ } else if (message instanceof String) {
+ Message.ACK_MESSAGE = message.toString();
+ } else if (message instanceof UnsubscribeReceiver) {
+ LOG.info("Stop actor!");
+ }
+ }
+
+ private Object createSingleDataMessage() {
+ return Message.WELCOME_MESSAGE;
+ }
+
+ private List<Object> createIterableOfMessages() {
+ List<Object> messages = new ArrayList<Object>();
+
+ messages.add(Message.WELCOME_MESSAGE);
+ messages.add(Message.FEEDER_MESSAGE);
+
+ return messages;
+ }
+
+ private byte[] createByteMessages() {
+ byte[] message = Message.WELCOME_MESSAGE.getBytes();
+ return message;
+ }
+
+ private Tuple2<Object, Long> createTimestampMessage() {
+ Tuple2<Object, Long> message = new Tuple2<Object, Long>();
+ message.f0 = Message.WELCOME_MESSAGE;
+ message.f1 = System.currentTimeMillis();
+
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/Message.java
----------------------------------------------------------------------
diff --git a/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/Message.java b/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/Message.java
new file mode 100644
index 0000000..6f70467
--- /dev/null
+++ b/flink-connector-akka/src/test/java/org/apache/flink/streaming/connectors/akka/utils/Message.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.akka.utils;
+
+public class Message {
+ public static final String WELCOME_MESSAGE = "welcome receiver";
+ public static final String FEEDER_MESSAGE = "this is feeder";
+
+ public static String ACK_MESSAGE;
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/test/resources/feeder_actor.conf
----------------------------------------------------------------------
diff --git a/flink-connector-akka/src/test/resources/feeder_actor.conf b/flink-connector-akka/src/test/resources/feeder_actor.conf
new file mode 100644
index 0000000..a877aa3
--- /dev/null
+++ b/flink-connector-akka/src/test/resources/feeder_actor.conf
@@ -0,0 +1,33 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+akka {
+ loglevel = "INFO"
+ actor {
+ provider = "akka.remote.RemoteActorRefProvider"
+ }
+ remote {
+ enabled-transports = ["akka.remote.netty.tcp"]
+ netty.tcp {
+ hostname = 127.0.0.1
+ port = 5150
+ }
+ log-sent-messages = on
+ log-received-messages = on
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/flink-connector-akka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-connector-akka/src/test/resources/log4j.properties b/flink-connector-akka/src/test/resources/log4j.properties
new file mode 100644
index 0000000..c82c2c7
--- /dev/null
+++ b/flink-connector-akka/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This file ensures that tests executed from the IDE show log output
+
+log4j.rootLogger=INFO, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/a830077a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ff6da2b..9a15ff4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,7 @@
<module>flink-connector-flume</module>
<module>flink-connector-activemq</module>
<module>flink-connector-netty</module>
+ <module>flink-connector-akka</module>
</modules>
<properties>