You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by rm...@apache.org on 2016/11/23 10:44:49 UTC

bahir-flink git commit: [BAHIR-72] support netty: pushed tcp/http connector When source stream get start, listen a provided tcp port, receive stream data from user data source. This netty tcp source is keepping alive and end-to-end, that is from business

Repository: bahir-flink
Updated Branches:
  refs/heads/master 6b0f6820c -> 2d1225f9a


[BAHIR-72] support netty: pushed tcp/http connector
When source stream get start, listen a provided tcp port, receive stream data from user data source.
This netty tcp source is keepping alive and end-to-end, that is from business system to flink worker directly.
1.	source run as a netty tcp and http server
2.	user provide a tcp port, if the port is in used, increace the port number between 1024 to 65535. Source can parallel.
3.	callback the provided url to report the real port to listen
4.	user push streaming data to netty server, then collect the data to flink

This closes #7


Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/2d1225f9
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/2d1225f9
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/2d1225f9

Branch: refs/heads/master
Commit: 2d1225f9a3c5401c48267b5bcef49b19254ea3aa
Parents: 6b0f682
Author: shijinkui <sh...@huawei.com>
Authored: Mon Oct 24 19:40:24 2016 +0800
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Nov 23 11:44:37 2016 +0100

----------------------------------------------------------------------
 flink-connector-netty/README.md                 |  65 ++++++++
 flink-connector-netty/pom.xml                   |  87 ++++++++++
 .../connectors/netty/example/HttpHandler.scala  |  82 ++++++++++
 .../netty/example/HttpReceiverSource.scala      |  51 ++++++
 .../connectors/netty/example/HttpServer.scala   |  90 ++++++++++
 .../netty/example/LineParserTrait.scala         |  28 ++++
 .../connectors/netty/example/NettyUtil.scala    | 164 +++++++++++++++++++
 .../connectors/netty/example/ServerTrait.scala  |  50 ++++++
 .../connectors/netty/example/TcpHandler.scala   |  45 +++++
 .../netty/example/TcpReceiverSource.scala       |  55 +++++++
 .../connectors/netty/example/TcpServer.scala    | 124 ++++++++++++++
 .../src/test/resources/log4j-test.properties    |  38 +++++
 .../src/test/resources/logback-test.xml         |  29 ++++
 .../connectors/netty/example/BaseTest.scala     |  84 ++++++++++
 .../netty/example/HttpSourceExample.scala       |  50 ++++++
 .../netty/example/MonitorServer.scala           | 105 ++++++++++++
 .../connectors/netty/example/NettyClient.scala  |  72 ++++++++
 .../netty/example/NettyClientHandler.scala      |  49 ++++++
 .../netty/example/StreamSqlExample.scala        |  77 +++++++++
 .../netty/example/TcpSourceExample.scala        |  50 ++++++
 pom.xml                                         |   6 +
 scalastyle-config.xml                           |   2 +-
 22 files changed, 1402 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2d1225f9/flink-connector-netty/README.md
----------------------------------------------------------------------
diff --git a/flink-connector-netty/README.md b/flink-connector-netty/README.md
new file mode 100644
index 0000000..b081f23
--- /dev/null
+++ b/flink-connector-netty/README.md
@@ -0,0 +1,65 @@
+#   Flink Netty Connector
+
+This connector provide tcp source and http source for receiving push data, implemented by [Netty](http://netty.io). 
+
+##  Data Flow
+
+```
++-------------+      (2)    +------------------------+
+| user system |    <-----   | Third Register Service |           
++-------------+             +------------------------+
+       |                                ^
+       | (3)                            |
+       |                                |
+       V                                |
++--------------------+                  |
+| Flink Netty Source |  ----------------+
++--------------------+         (1)
+```
+
+There are three component:
+
+*   User System - where the data streaming come from
+*   Third Register Service - receive `Flink Netty Source`'s register request(ip and port)
+*   Flink Netty Source - Netty Server for receiving pushed streaming data from `User System`
+
+
+##   Maven Dependency
+To use this connector, add the following dependency to your project:
+
+```
+<dependency>
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>flink-connector-netty_2.11</artifactId>
+  <version>1.0</version>
+</dependency>
+```
+
+##  Usage
+
+*Tcp Source:*
+
+```
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.addSource(new TcpReceiverSource("msg", 7070, Some("http://localhost:9090/cb")))
+```
+>paramKey:  the http query param key    
+>tryPort:   try to use this point, if this point is used then try a new port
+>callbackUrl:   register connector's ip and port to a `Third Register Service`
+
+*Http Source:*
+
+```
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.addSource(new TcpReceiverSource(7070, Some("http://localhost:9090/cb")))
+```
+>tryPort:   try to use this point, if this point is used then try a new port
+>callbackUrl:   register connector's ip and port to a `Third Register Service`
+
+##  full example 
+
+There are two example for get start:
+
+*   [StreamSqlExample](https://github.com/apache/bahir-flink/blob/master/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala)
+*   [TcpSourceExample](https://github.com/apache/bahir-flink/blob/master/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/TcpSourceExample.scala)
+

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2d1225f9/flink-connector-netty/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connector-netty/pom.xml b/flink-connector-netty/pom.xml
new file mode 100644
index 0000000..ba700c8
--- /dev/null
+++ b/flink-connector-netty/pom.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.bahir</groupId>
+    <artifactId>bahir-flink_parent_2.11</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>flink-connector-netty_2.11</artifactId>
+  <name>flink-connector-netty</name>
+  <version>1.0.0-SNAPSHOT</version>
+  <packaging>jar</packaging>
+
+  <dependencies>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-all</artifactId>
+      <version>4.1.5.Final</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>3.4</version>
+    </dependency>
+
+    <!--test dependencies-->
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>19.0</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.alibaba</groupId>
+      <artifactId>fastjson</artifactId>
+      <version>1.2.16</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <version>4.5.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2d1225f9/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/HttpHandler.scala
----------------------------------------------------------------------
diff --git a/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/HttpHandler.scala b/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/HttpHandler.scala
new file mode 100644
index 0000000..8eb3ea1
--- /dev/null
+++ b/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/HttpHandler.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.netty.example
+
+import io.netty.buffer.Unpooled
+import io.netty.channel.{ChannelFutureListener, ChannelHandlerContext, ChannelInboundHandlerAdapter}
+import io.netty.handler.codec.http.HttpResponseStatus._
+import io.netty.handler.codec.http.HttpVersion._
+import io.netty.handler.codec.http._
+import io.netty.util.AsciiString
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.slf4j.LoggerFactory
+
+/**
+ * http server handler, process http request
+ *
+ * @param sc       Flink source context for collect received message
+ * @param paramKey the http query param key
+ */
+class HttpHandler(
+  sc: SourceContext[String],
+  paramKey: String
+) extends ChannelInboundHandlerAdapter {
+
+  private lazy val logger = LoggerFactory.getLogger(getClass)
+  private lazy val CONTENT_TYPE = new AsciiString("Content-Type")
+  private lazy val CONTENT_LENGTH = new AsciiString("Content-Length")
+
+  override def channelReadComplete(ctx: ChannelHandlerContext): Unit = ctx.flush
+
+  override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = {
+    msg match {
+      case req: HttpRequest =>
+        if (HttpUtil.is100ContinueExpected(req)) {
+          ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE))
+        }
+
+        val keepAlive: Boolean = HttpUtil.isKeepAlive(req)
+        if (!keepAlive) {
+          ctx.writeAndFlush(buildResponse()).addListener(ChannelFutureListener.CLOSE)
+        } else {
+          val decoder = new QueryStringDecoder(req.uri)
+          val param: java.util.Map[String, java.util.List[String]] = decoder.parameters()
+          if (param.containsKey(paramKey)) {
+            sc.collect(param.get(paramKey).get(0))
+          }
+          ctx.writeAndFlush(buildResponse())
+        }
+      case x =>
+        logger.info("unsupport request format " + x)
+    }
+  }
+
+  private def buildResponse(content: Array[Byte] = Array.empty[Byte]): FullHttpResponse = {
+    val response: FullHttpResponse = new DefaultFullHttpResponse(
+      HTTP_1_1, OK, Unpooled.wrappedBuffer(content)
+    )
+    response.headers.set(CONTENT_TYPE, "text/plain")
+    response.headers.setInt(CONTENT_LENGTH, response.content.readableBytes)
+    response
+  }
+
+  override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
+    logger.error("channel exception " + ctx.channel().toString, cause)
+    ctx.close
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2d1225f9/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/HttpReceiverSource.scala
----------------------------------------------------------------------
diff --git a/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/HttpReceiverSource.scala b/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/HttpReceiverSource.scala
new file mode 100644
index 0000000..1fc0392
--- /dev/null
+++ b/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/HttpReceiverSource.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.netty.example
+
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+
+/**
+ * Http receiver source is used for receiving pushed http request.
+ * It work for two step:
+ * 1. start netty server with an un-used port when Flink get start
+ * 2. after started netty, call back [[callbackUrl]] for register current connector to
+ * message service, user can push http message to this address.
+ * {{{
+ *   // for example:
+ *   val env = StreamExecutionEnvironment.getExecutionEnvironment
+ *   env.addSource(new TcpReceiverSource(7070, Some("http://localhost:9090/cb")))
+ * }}}
+ *
+ * @param paramKey    the http query param key
+ * @param tryPort     try to use this point, if this point is used then try a new port
+ * @param callbackUrl register connector's ip and port to a third service
+ */
+final class HttpReceiverSource(
+  paramKey: String,
+  tryPort: Int,
+  callbackUrl: Option[String] = None
+) extends RichParallelSourceFunction[String] {
+  private var server: HttpServer = _
+
+  override def cancel(): Unit = server.close()
+
+  override def run(ctx: SourceContext[String]): Unit = {
+    server = new HttpServer(ctx, paramKey)
+    server.start(tryPort, callbackUrl)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2d1225f9/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/HttpServer.scala
----------------------------------------------------------------------
diff --git a/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/HttpServer.scala b/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/HttpServer.scala
new file mode 100644
index 0000000..68799d0
--- /dev/null
+++ b/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/HttpServer.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.netty.example
+
+import java.net.InetSocketAddress
+import java.util.concurrent.atomic.AtomicBoolean
+
+import io.netty.bootstrap.ServerBootstrap
+import io.netty.channel.nio.NioEventLoopGroup
+import io.netty.channel.socket.SocketChannel
+import io.netty.channel.socket.nio.NioServerSocketChannel
+import io.netty.channel.{Channel, ChannelInitializer, ChannelOption}
+import io.netty.handler.codec.http.HttpServerCodec
+import io.netty.handler.logging.{LogLevel, LoggingHandler}
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.slf4j.LoggerFactory
+
+/**
+ * netty http server
+ *
+ * @param ctx       Flink source context for collect received message
+ * @param paramKey  the http query param key
+ * @param threadNum cpu number used by netty epoll
+ * @param logLevel  netty log level
+ */
+class HttpServer(
+  ctx: SourceContext[String],
+  paramKey: String,
+  threadNum: Int = Runtime.getRuntime.availableProcessors(),
+  logLevel: LogLevel = LogLevel.INFO
+) extends ServerTrait {
+  private lazy val logger = LoggerFactory.getLogger(getClass)
+  private lazy val bossGroup = new NioEventLoopGroup(threadNum)
+  private lazy val workerGroup = new NioEventLoopGroup
+  private lazy val isRunning = new AtomicBoolean(false)
+
+  private var currentAddr: InetSocketAddress = _
+
+  override def close(): Unit = {
+    bossGroup.shutdownGracefully()
+    workerGroup.shutdownGracefully()
+    logger.info("successfully close netty server source")
+  }
+
+  def startNettyServer(
+    portNotInUse: Int,
+    callbackUrl: Option[String]
+  ): InetSocketAddress = synchronized {
+
+    if (!isRunning.get()) {
+      val b: ServerBootstrap = new ServerBootstrap
+      b
+        .option[java.lang.Integer](ChannelOption.SO_BACKLOG, 1024)
+        .group(bossGroup, workerGroup)
+        .channel(classOf[NioServerSocketChannel])
+        .handler(new LoggingHandler(logLevel))
+        .childHandler(new ChannelInitializer[SocketChannel] {
+          override def initChannel(ch: SocketChannel): Unit = {
+            val p = ch.pipeline()
+            p.addLast(new HttpServerCodec)
+            p.addLast(new HttpHandler(ctx, paramKey))
+          }
+        })
+      val f = b.bind(portNotInUse)
+      f.syncUninterruptibly()
+      val ch: Channel = f.channel()
+      isRunning.set(true)
+      currentAddr = ch.localAddress().asInstanceOf[InetSocketAddress]
+      register(currentAddr, callbackUrl)
+      ch.closeFuture().sync()
+      currentAddr
+    } else {
+      currentAddr
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2d1225f9/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/LineParserTrait.scala
----------------------------------------------------------------------
diff --git a/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/LineParserTrait.scala b/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/LineParserTrait.scala
new file mode 100644
index 0000000..83f4a67
--- /dev/null
+++ b/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/LineParserTrait.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.netty.example
+
+/**
+ * parse data line
+ */
+trait PayloadParse[S, T] {
+  def parse(line: S): T
+}
+
+trait StringParser[T] extends PayloadParse[String, T]
+
+trait BytesParser[Array[Byte], T] extends PayloadParse[Array[Byte], T]

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2d1225f9/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/NettyUtil.scala
----------------------------------------------------------------------
diff --git a/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/NettyUtil.scala b/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/NettyUtil.scala
new file mode 100644
index 0000000..e6e189f
--- /dev/null
+++ b/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/NettyUtil.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.netty.example
+
+import java.io.{BufferedReader, InputStreamReader}
+import java.net._
+
+import org.apache.commons.lang3.SystemUtils
+import org.mortbay.util.MultiException
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+
+/**
+ * Netty Utility class for start netty service and retry tcp port
+ */
+object NettyUtil {
+  private lazy val logger = LoggerFactory.getLogger(getClass)
+
+  /** find local inet addresses */
+  def findLocalInetAddress(): InetAddress = {
+
+    val address = InetAddress.getLocalHost
+    address.isLoopbackAddress match {
+      case true =>
+        // Address resolves to something like 127.0.1.1, which happens on Debian; try to find
+        // a better address using the local network interfaces
+        // getNetworkInterfaces returns ifs in reverse order compared to ifconfig output order
+        // on unix-like system. On windows, it returns in index order.
+        // It's more proper to pick ip address following system output order.
+        val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.asScala.toSeq
+        val reOrderedNetworkIFs = SystemUtils.IS_OS_WINDOWS match {
+          case true => activeNetworkIFs
+          case false => activeNetworkIFs.reverse
+        }
+
+        reOrderedNetworkIFs.find { ni: NetworkInterface =>
+          val addr = ni.getInetAddresses.asScala.toSeq.filterNot { addr =>
+            addr.isLinkLocalAddress || addr.isLoopbackAddress
+          }
+          addr.nonEmpty
+        } match {
+          case Some(ni) =>
+            val addr = ni.getInetAddresses.asScala.toSeq.filterNot { inet =>
+              inet.isLinkLocalAddress || inet.isLoopbackAddress
+            }
+            val address = addr.find(_.isInstanceOf[Inet4Address]).getOrElse(addr.head).getAddress
+            // because of Inet6Address.toHostName may add interface at the end if it knows about it
+            InetAddress.getByAddress(address)
+          case None => address
+        }
+      case false => address
+    }
+  }
+
+  /**
+   * start service, if port is collision, retry 128 times
+   * Tip: this function is copy from spark: org.apache.spark.util.Utils.scala#L2172
+   * Its better way to retry unused port
+   */
+  def startServiceOnPort[T](
+    startPort: Int,
+    startService: Int => T,
+    maxRetries: Int = 128,
+    serviceName: String = ""): T = {
+
+    if (startPort != 0 && (startPort < 1024 || startPort > 65536)) {
+      throw new Exception("startPort should be between 1024 and 65535 (inclusive), " +
+        "or 0 for a random free port.")
+    }
+
+    val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
+    for (offset <- 0 to maxRetries) {
+      // Do not increment port if startPort is 0, which is treated as a special port
+      val tryPort = if (startPort == 0) {
+        startPort
+      } else {
+        // If the new port wraps around, do not try a privilege port
+        ((startPort + offset - 1024) % (65536 - 1024)) + 1024
+      }
+
+      try {
+        val result = startService(tryPort)
+        logger.info(s"Successfully started service$serviceString, result:$result.")
+        return result
+      } catch {
+        case e: Exception if isBindCollision(e) =>
+          if (offset >= maxRetries) {
+            val exceptionMessage = s"${e.getMessage}: Service$serviceString failed after " +
+              s"$maxRetries retries! Consider explicitly setting the appropriate port for the " +
+              s"service$serviceString (for example spark.ui.port for SparkUI) to an available " +
+              "port or increasing spark.port.maxRetries."
+            val exception = new BindException(exceptionMessage)
+            // restore original stack trace
+            exception.setStackTrace(e.getStackTrace)
+            throw exception
+          }
+          logger.error(s"Service$serviceString could not bind on port $tryPort. " +
+            s"Attempting port ${tryPort + 1}.")
+      }
+    }
+    // Should never happen
+    throw new Exception(s"Failed to start service$serviceString on port $startPort")
+  }
+
+  /** send GET request to this url */
+  def sendGetRequest(url: String): String = {
+    val obj: URL = new URL(url)
+    val con: HttpURLConnection = obj.openConnection.asInstanceOf[HttpURLConnection]
+    con.setRequestMethod("GET")
+    val code = try {
+      con.getResponseCode
+    } catch {
+      case e: Throwable => e
+    }
+
+    code match {
+      case HttpURLConnection.HTTP_OK =>
+        val in: BufferedReader = new BufferedReader(new InputStreamReader(con.getInputStream))
+        var inputLine: String = ""
+        val response: StringBuilder = new StringBuilder
+        try {
+          while (inputLine != null) {
+            response.append(inputLine)
+            inputLine = in.readLine()
+          }
+          in.close()
+        } catch {
+          case throwable: Exception =>
+        }
+        response.toString
+      case x => throw new Exception("GET request not worked of url: " + url)
+    }
+  }
+
+  /** Return whether the exception is caused by an address-port collision when binding. */
+  private def isBindCollision(exception: Throwable): Boolean = {
+    exception match {
+      case e: BindException if e.getMessage != null => true
+      case e: BindException => isBindCollision(e.getCause)
+      case e: MultiException =>
+        e.getThrowables.asScala.toList.map(_.asInstanceOf[Throwable]).exists(isBindCollision)
+      case e: Exception => isBindCollision(e.getCause)
+      case _ => false
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2d1225f9/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/ServerTrait.scala
----------------------------------------------------------------------
diff --git a/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/ServerTrait.scala b/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/ServerTrait.scala
new file mode 100644
index 0000000..ac2e76b
--- /dev/null
+++ b/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/ServerTrait.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.netty.example
+
+import java.io.Closeable
+import java.net.{InetSocketAddress, URLEncoder}
+
+/**
+ * Server trait for define server behave.
+ * Port and callback url is required.
+ */
+trait ServerTrait extends Closeable {
+
+  def start(tryPort: Int, callbackUrl: Option[String]): InetSocketAddress = {
+    NettyUtil.startServiceOnPort(tryPort, (p: Int) => startNettyServer(p, callbackUrl))
+  }
+
+  def startNettyServer(portNotInUse: Int, callbackUrl: Option[String]): InetSocketAddress
+
+  def register(address: InetSocketAddress, callbackUrl: Option[String]): Unit = {
+    callbackUrl match {
+      case Some(url) =>
+        val ip = address.getAddress.getHostAddress
+        val newIp = if (ip.startsWith("0") || ip.startsWith("127")) {
+          NettyUtil.findLocalInetAddress().getHostAddress
+        } else {
+          ip
+        }
+        val port = address.getPort
+        val param = s"ip=${URLEncoder.encode(newIp, "UTF-8")}&port=$port"
+        val callUrl = if (url.endsWith("?")) param else "?" + param
+        NettyUtil.sendGetRequest(url + callUrl)
+      case _ =>
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2d1225f9/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/TcpHandler.scala
----------------------------------------------------------------------
diff --git a/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/TcpHandler.scala b/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/TcpHandler.scala
new file mode 100644
index 0000000..4ccabe6
--- /dev/null
+++ b/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/TcpHandler.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.netty.example
+
+import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.slf4j.LoggerFactory
+
+/**
+ * process netty stream data, add to flink
+ */
+private class TcpHandler(sctx: SourceContext[String]) extends SimpleChannelInboundHandler[String] {
+  private lazy val logger = LoggerFactory.getLogger(getClass)
+
+  override def channelRead0(ctx: ChannelHandlerContext, msg: String): Unit = {
+    sctx.collect(msg)
+  }
+
+  override def channelActive(ctx: ChannelHandlerContext): Unit = {
+    logger.info(s"tcp channel active, remote address:${ctx.channel().remoteAddress()}")
+  }
+
+  override def channelReadComplete(ctx: ChannelHandlerContext): Unit = ctx.flush
+
+  override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
+    logger.error(s"netty server channel ${ctx.channel()} error", cause)
+    ctx.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2d1225f9/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/TcpReceiverSource.scala
----------------------------------------------------------------------
diff --git a/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/TcpReceiverSource.scala b/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/TcpReceiverSource.scala
new file mode 100644
index 0000000..9d87f8b
--- /dev/null
+++ b/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/TcpReceiverSource.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.netty.example
+
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+
+/**
+ * A end-to-end source, build a keep-alive tcp channel by netty.
+ *
+ * When this source stream get start, listen a provided tcp port, receive stream data sent from
+ * the place where origin data generated.
+ * {{{
+ *   // for example:
+ *   val env = StreamExecutionEnvironment.getExecutionEnvironment
+ *   env.addSource(new TcpReceiverSource("msg", 7070, Some("http://localhost:9090/cb")))
+ * }}}
+ * The features provide by this source:
+ * 1. source run as a netty tcp server
+ * 2. listen provided tcp port, if the port is in used,
+ * increase the port number between 1024 to 65535
+ * 3. callback the provided url to report the real port to listen
+ *
+ * @param tryPort     the tcp port to start, if port Collision, retry a new port
+ * @param callbackUrl when netty server started, report the ip and port to this url
+ */
+final class TcpReceiverSource(
+  tryPort: Int,
+  callbackUrl: Option[String] = None
+) extends RichParallelSourceFunction[String] {
+  private var server: TcpServer = _
+
+  override def cancel(): Unit = server.close()
+
+  override def run(ctx: SourceContext[String]): Unit = {
+    server = TcpServer(tryPort, ctx)
+    server.start(tryPort, callbackUrl)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2d1225f9/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/TcpServer.scala
----------------------------------------------------------------------
diff --git a/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/TcpServer.scala b/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/TcpServer.scala
new file mode 100644
index 0000000..fec74ce
--- /dev/null
+++ b/flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/TcpServer.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.netty.example
+
+import java.net.InetSocketAddress
+import java.util.concurrent.atomic.AtomicBoolean
+
+import io.netty.bootstrap.ServerBootstrap
+import io.netty.channel.nio.NioEventLoopGroup
+import io.netty.channel.socket.SocketChannel
+import io.netty.channel.socket.nio.NioServerSocketChannel
+import io.netty.channel.{ChannelFuture, ChannelInitializer, ChannelOption, ChannelPipeline}
+import io.netty.handler.codec.string.{StringDecoder, StringEncoder}
+import io.netty.handler.codec.{DelimiterBasedFrameDecoder, Delimiters}
+import io.netty.handler.logging.{LogLevel, LoggingHandler}
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.slf4j.LoggerFactory
+
+/**
+ * Netty Server bootstrap with user-provided tcp port.
+ * - Receiving streaming data
+ * - add to Flink [[org.apache.flink.hadoop.shaded.org.jboss.netty.channel.ChannelHandlerContext]]
+ *
+ * @param tryPort     port start to retry
+ * @param ctx         flink stream collect data from netty
+ * @param tcpOpts     tcp option for netty server
+ * @param threadNum   thread number for netty, default is current machine processor number
+ * @param maxFrameLen max netty frame length
+ * @param logLevel    netty log level
+ */
+class TcpServer(
+  tryPort: Int,
+  ctx: SourceContext[String],
+  tcpOpts: ServerBootstrap => Unit,
+  threadNum: Int = Runtime.getRuntime.availableProcessors(),
+  maxFrameLen: Int = 8192,
+  logLevel: LogLevel = LogLevel.INFO
+) extends ServerTrait {
+
+  private lazy val logger = LoggerFactory.getLogger(getClass)
+  private lazy val bossGroup = new NioEventLoopGroup(threadNum)
+  private lazy val workerGroup = new NioEventLoopGroup
+  private lazy val isRunning = new AtomicBoolean(false)
+
+  private var currentAddr: InetSocketAddress = _
+
+  def startNettyServer(
+    portNotInUse: Int,
+    callbackUrl: Option[String]
+  ): InetSocketAddress = synchronized {
+    if (!isRunning.get()) {
+
+      val server = new ServerBootstrap
+      val bootstrap: ServerBootstrap = server
+        .group(bossGroup, workerGroup)
+        .channel(classOf[NioServerSocketChannel])
+        .option[java.lang.Boolean](ChannelOption.TCP_NODELAY, true)
+        .option[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true)
+
+      tcpOpts(bootstrap)
+
+      val bootWithHandler = bootstrap
+        .handler(new LoggingHandler(logLevel))
+        .childHandler(new ChannelInitializer[SocketChannel]() {
+          def initChannel(ch: SocketChannel) {
+            val p: ChannelPipeline = ch.pipeline
+            p.addLast(new DelimiterBasedFrameDecoder(maxFrameLen, Delimiters.lineDelimiter(): _*))
+            p.addLast(new StringEncoder())
+            p.addLast(new StringDecoder())
+            p.addLast(new TcpHandler(ctx))
+          }
+        })
+
+      // Start the server.
+      val f: ChannelFuture = bootWithHandler.bind(portNotInUse)
+      f.syncUninterruptibly()
+      currentAddr = f.channel().localAddress().asInstanceOf[InetSocketAddress]
+      logger.info(s"start tcp server on address: $currentAddr")
+      isRunning.set(true)
+      register(currentAddr, callbackUrl)
+      f.channel().closeFuture().sync()
+      currentAddr
+    } else {
+      logger.info(s"server is running on address: $currentAddr, no need repeat start it")
+      currentAddr
+    }
+  }
+
+  override def close(): Unit = {
+    bossGroup.shutdownGracefully()
+    workerGroup.shutdownGracefully()
+    logger.info("successfully close netty server source")
+  }
+}
+
+object TcpServer {
+
+  def apply(
+    tryPort: Int,
+    ctx: SourceContext[String],
+    threadNum: Int = Runtime.getRuntime.availableProcessors(),
+    maxFrameLen: Int = 8192,
+    logLevel: LogLevel = LogLevel.INFO
+  ): TcpServer = {
+    val tcpOptions = (bootstrap: ServerBootstrap) => {}
+    new TcpServer(tryPort, ctx, tcpOptions, threadNum, maxFrameLen, logLevel)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2d1225f9/flink-connector-netty/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connector-netty/src/test/resources/log4j-test.properties b/flink-connector-netty/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..398a590
--- /dev/null
+++ b/flink-connector-netty/src/test/resources/log4j-test.properties
@@ -0,0 +1,38 @@
+###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, console
+
+# -----------------------------------------------------------------------------
+# Console (use 'console')
+# -----------------------------------------------------------------------------
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.flink.util.MavenForkNumberPrefixLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# -----------------------------------------------------------------------------
+# File (use 'file')
+# -----------------------------------------------------------------------------
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.file=${log.dir}/{$mvn.forkNumber}.log
+log4j.appender.file.append=false
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2d1225f9/flink-connector-netty/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-connector-netty/src/test/resources/logback-test.xml b/flink-connector-netty/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..74cf542
--- /dev/null
+++ b/flink-connector-netty/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>%d{HH:mm:ss.SSS} [%thread] [%X{sourceThread} - %X{akkaSource}] %-5level %logger{60} - %msg%n</pattern>
+    </encoder>
+  </appender>
+
+  <root level="WARN">
+    <appender-ref ref="STDOUT"/>
+  </root>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2d1225f9/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/BaseTest.scala
----------------------------------------------------------------------
diff --git a/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/BaseTest.scala b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/BaseTest.scala
new file mode 100644
index 0000000..069f957
--- /dev/null
+++ b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/BaseTest.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.netty.example
+
+import java.util
+import java.util.concurrent.{Executors, LinkedBlockingQueue, TimeUnit}
+
+import com.alibaba.fastjson.JSONObject
+import org.apache.http.NameValuePair
+import org.apache.http.client.entity.UrlEncodedFormEntity
+import org.apache.http.client.methods.{HttpGet, HttpPost}
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.message.BasicNameValuePair
+import org.apache.http.util.EntityUtils
+import org.slf4j.LoggerFactory
+
+/** base test util */
+class BaseTest {
+  lazy val logger = LoggerFactory.getLogger(getClass)
+  private lazy val httpclient = HttpClients.createDefault()
+  private lazy val queue = new LinkedBlockingQueue[JSONObject]()
+  private lazy val schedule = Executors.newScheduledThreadPool(20)
+  private lazy val pool = Executors.newCachedThreadPool()
+
+
+  def schedule(period: Int, f: () => Unit): Unit = {
+    schedule.scheduleAtFixedRate(new Runnable {
+      override def run(): Unit = {
+        f.apply()
+      }
+    }, 3, period, TimeUnit.SECONDS)
+  }
+
+  def run(f: () => Unit): Unit = {
+    pool.submit(new Runnable {
+      override def run(): Unit = {
+        f.apply()
+      }
+    })
+  }
+
+  def sendGetRequest(url: String): String = {
+    val httpGet = new HttpGet(url)
+    val response1 = httpclient.execute(httpGet)
+    try {
+      logger.info(s"response: ${response1.getStatusLine}, url:$url")
+      val entity = response1.getEntity
+      EntityUtils.toString(entity)
+    } finally {
+      response1.close()
+    }
+  }
+
+  def sendPostRequest(url: String, map: Map[String, String]): String = {
+    val httpPost = new HttpPost(url)
+    val nvps = new util.ArrayList[NameValuePair]()
+    map.foreach { kv =>
+      nvps.add(new BasicNameValuePair(kv._1, kv._2))
+    }
+    httpPost.setEntity(new UrlEncodedFormEntity(nvps))
+    val response = httpclient.execute(httpPost)
+    try {
+      logger.info("response status line:" + response.getStatusLine)
+      val entity2 = response.getEntity
+      EntityUtils.toString(entity2)
+    } finally {
+      response.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2d1225f9/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/HttpSourceExample.scala
----------------------------------------------------------------------
diff --git a/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/HttpSourceExample.scala b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/HttpSourceExample.scala
new file mode 100644
index 0000000..7459e9a
--- /dev/null
+++ b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/HttpSourceExample.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.netty.example
+
+import java.net.URLEncoder
+import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
+
+import com.alibaba.fastjson.JSONObject
+
+import scala.util.Random
+
+/**
+ * http client
+ * Created by shijinkui on 2016/9/26.
+ */
+object HttpSourceExample extends BaseTest {
+
+  def main(args: Array[String]): Unit = {
+    val queue = new LinkedBlockingQueue[JSONObject]()
+
+    run(() => new MonitorServer(queue).start(9090))
+    run(() => StreamSqlExample.main(Array("--http", "true")))
+
+    Thread.sleep(5000)
+
+    while (true) {
+      val json = queue.poll(Int.MaxValue, TimeUnit.SECONDS)
+      logger.info("====request register from netty tcp source: " + json)
+      val url = s"http://${json.getString("ip")}:${json.getString("port")}/payload?msg="
+      schedule(5, () => {
+        val line = s"${Random.nextInt(5)},abc,${Random.nextInt(100)}"
+        sendGetRequest(url + URLEncoder.encode(line, "UTF-8"))
+      })
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2d1225f9/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/MonitorServer.scala
----------------------------------------------------------------------
diff --git a/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/MonitorServer.scala b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/MonitorServer.scala
new file mode 100644
index 0000000..fa5c270
--- /dev/null
+++ b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/MonitorServer.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.netty.example
+
+import java.net.URLDecoder
+import java.util.concurrent.LinkedBlockingQueue
+
+import com.alibaba.fastjson.JSONObject
+import io.netty.bootstrap.ServerBootstrap
+import io.netty.buffer.Unpooled
+import io.netty.channel._
+import io.netty.channel.nio.NioEventLoopGroup
+import io.netty.channel.socket.SocketChannel
+import io.netty.channel.socket.nio.NioServerSocketChannel
+import io.netty.handler.codec.http._
+import io.netty.handler.logging.{LogLevel, LoggingHandler}
+import org.slf4j.LoggerFactory
+
+class MonitorServer(queue: LinkedBlockingQueue[JSONObject]) {
+  def start(port: Int): Unit = {
+    // Configure the server.
+    val bossGroup: EventLoopGroup = new NioEventLoopGroup(1)
+    val workerGroup: EventLoopGroup = new NioEventLoopGroup
+    val b: ServerBootstrap = new ServerBootstrap
+    b.option[java.lang.Integer](ChannelOption.SO_BACKLOG, 1024)
+    b
+      .group(bossGroup, workerGroup)
+      .channel(classOf[NioServerSocketChannel])
+      .handler(new LoggingHandler(LogLevel.INFO))
+      .childHandler(new ChannelInitializer[SocketChannel] {
+        override def initChannel(ch: SocketChannel): Unit = {
+          val p = ch.pipeline()
+          p.addLast(new HttpServerCodec)
+          p.addLast(new MonitorHandler(queue))
+        }
+      })
+    val f = b.bind(port).syncUninterruptibly()
+    //    println("Open your web browser and navigate to http://127.0.0.1:" + port + '/')
+    f.channel().closeFuture().sync()
+  }
+}
+
+
+class MonitorHandler(list: LinkedBlockingQueue[JSONObject]) extends ChannelInboundHandlerAdapter {
+  private lazy val logger = LoggerFactory.getLogger(getClass)
+
+  override def channelReadComplete(ctx: ChannelHandlerContext): Unit = ctx.flush
+
+  override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = {
+    msg match {
+      case req: HttpRequest =>
+        val uri = req.uri()
+        //  add to queue
+        if (uri != "/favicon.ico") {
+          list.put(parseCallback(uri))
+        }
+        logger.info("received data: " + uri)
+
+        val ack: Array[Byte] = Array('O', 'K')
+        val response: FullHttpResponse = new DefaultFullHttpResponse(
+          HttpVersion.HTTP_1_1,
+          HttpResponseStatus.OK,
+          Unpooled.wrappedBuffer(ack)
+        )
+        response.headers.set("Content-Type", "text/plain")
+        response.headers.set("Content-Length", response.content.readableBytes)
+        ctx.write(response).addListener(ChannelFutureListener.CLOSE)
+      case _ =>
+    }
+  }
+
+  private def parseCallback(line: String): JSONObject = {
+    val obj = new JSONObject()
+    line.startsWith("/cb?") match {
+      case true =>
+        val map = line.substring(4).split("&")
+        map.foreach(f => {
+          val tp = f.split("=")
+          obj.put(tp.head, URLDecoder.decode(tp(1), "UTF-8"))
+        })
+      case false =>
+        logger.info("")
+    }
+    obj
+  }
+
+  override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
+    cause.printStackTrace()
+    ctx.close
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2d1225f9/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/NettyClient.scala
----------------------------------------------------------------------
diff --git a/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/NettyClient.scala b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/NettyClient.scala
new file mode 100644
index 0000000..60707cf
--- /dev/null
+++ b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/NettyClient.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.netty.example
+
+import io.netty.bootstrap.Bootstrap
+import io.netty.channel._
+import io.netty.channel.nio.NioEventLoopGroup
+import io.netty.channel.socket.SocketChannel
+import io.netty.channel.socket.nio.NioSocketChannel
+import io.netty.handler.codec.string.{StringDecoder, StringEncoder}
+import io.netty.handler.codec.{DelimiterBasedFrameDecoder, Delimiters}
+import io.netty.handler.logging.{LogLevel, LoggingHandler}
+import org.slf4j.LoggerFactory
+
+private class NettyClient(host: String, port: Int) extends Thread {
+  private lazy val logger = LoggerFactory.getLogger(getClass)
+
+  private lazy val group: EventLoopGroup = new NioEventLoopGroup
+  private var ch: Channel = _
+
+  def shutdown(): Unit = {
+    group.shutdownGracefully()
+  }
+
+  def send(line: String): Unit = {
+    if (ch.isActive && ch != null) {
+      ch.writeAndFlush(line + "\n")
+      logger.info("client send msg: "
+        + s"${ch.isActive} ${ch.isOpen}  ${ch.isRegistered} ${ch.isWritable}")
+    } else {
+      logger.info("client fail send msg, "
+        + s"${ch.isActive} ${ch.isOpen}  ${ch.isRegistered} ${ch.isWritable}")
+    }
+  }
+
+  override def run(): Unit = {
+
+    val b: Bootstrap = new Bootstrap
+    b.group(group)
+      .channel(classOf[NioSocketChannel])
+      .option[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true)
+      .option[java.lang.Boolean](ChannelOption.TCP_NODELAY, true)
+      .handler(new ChannelInitializer[SocketChannel]() {
+        def initChannel(ch: SocketChannel) {
+          val p: ChannelPipeline = ch.pipeline
+          p.addLast(new LoggingHandler(LogLevel.INFO))
+          p.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter(): _*))
+          p.addLast(new StringEncoder())
+          p.addLast(new StringDecoder())
+          p.addLast(new NettyClientHandler)
+        }
+      })
+    // Start the client.
+    val f: ChannelFuture = b.connect(host, port).sync()
+    ch = f.channel()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2d1225f9/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/NettyClientHandler.scala
----------------------------------------------------------------------
diff --git a/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/NettyClientHandler.scala b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/NettyClientHandler.scala
new file mode 100644
index 0000000..0e5642b
--- /dev/null
+++ b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/NettyClientHandler.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.netty.example
+
+import io.netty.channel._
+import org.slf4j.LoggerFactory
+
+import scala.util.Random
+
+
+final class NettyClientHandler extends SimpleChannelInboundHandler[String] {
+  private lazy val logger = LoggerFactory.getLogger(getClass)
+
+  override def channelActive(ctx: ChannelHandlerContext): Unit = {
+    val ch = ctx.channel()
+    logger.info(s"active channel: $ch")
+  }
+
+  override def channelInactive(ctx: ChannelHandlerContext) {
+    val ch = ctx.channel()
+    logger.info(s"inactive channel: $ch")
+  }
+
+  override def channelRead0(ctx: ChannelHandlerContext, msg: String) {
+    logger.info("receive message:" + msg)
+    ctx.writeAndFlush(Random.nextLong() + ",sjk," + Random.nextInt())
+  }
+
+  override def channelReadComplete(ctx: ChannelHandlerContext): Unit = ctx.flush
+
+  override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
+    cause.printStackTrace()
+    ctx.close
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2d1225f9/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala
----------------------------------------------------------------------
diff --git a/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala
new file mode 100644
index 0000000..da36ff4
--- /dev/null
+++ b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/StreamSqlExample.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.netty.example
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+
+/**
+ * Simple example for demonstrating the use of SQL on a Stream Table.
+ *
+ * This example shows how to:
+ *  - Convert DataStreams to Tables
+ *  - Register a Table under a name
+ *  - Run a StreamSQL query on the registered Table
+ */
+object StreamSqlExample {
+
+  // *************************************************************************
+  //     PROGRAM
+  // *************************************************************************
+
+  def main(args: Array[String]): Unit = {
+    val param = ParameterTool.fromArgs(args)
+
+    // set up execution environment
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val spec = if (param.get("tcp") == "true") {
+      new TcpReceiverSource(7070, Some("http://localhost:9090/cb"))
+    } else {
+      new HttpReceiverSource("msg", 7070, Some("http://localhost:9090/cb"))
+    }
+
+    val orderA: DataStream[Order] = env
+      .addSource(spec)
+      .setParallelism(3)
+      .map { line =>
+        val tk = line.split(",")
+        Order(tk.head.trim.toLong, tk(1), tk(2).trim.toInt)
+      }
+    // register the DataStreams under the name "OrderA" and "OrderB"
+    tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount)
+
+    // union the two tables
+    val result = tEnv.sql("SELECT STREAM * FROM OrderA WHERE amount > 2")
+
+    result.toDataStream[Order].print()
+
+    env.execute()
+  }
+
+  // *************************************************************************
+  //     USER DATA TYPES
+  // *************************************************************************
+
+  case class Order(user: Long, product: String, amount: Int)
+
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2d1225f9/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/TcpSourceExample.scala
----------------------------------------------------------------------
diff --git a/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/TcpSourceExample.scala b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/TcpSourceExample.scala
new file mode 100644
index 0000000..2955c0a
--- /dev/null
+++ b/flink-connector-netty/src/test/scala/org/apache/flink/streaming/connectors/netty/example/TcpSourceExample.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.netty.example
+
+import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
+
+import com.alibaba.fastjson.JSONObject
+
+import scala.util.Random
+
+object TcpSourceExample extends BaseTest {
+
+  def main(args: Array[String]): Unit = {
+    val queue = new LinkedBlockingQueue[JSONObject]()
+
+    //  1.  register server, wait for flink netty source server started
+    run(() => new MonitorServer(queue).start(9090))
+    //  2.  start flink job
+    run(() => StreamSqlExample.main(Array("--tcp", "true")))
+
+    Thread.sleep(5000)
+
+    //  3.  sending message to netty source continuously
+    while (true) {
+      logger.info("==============")
+      val json = queue.poll(Int.MaxValue, TimeUnit.SECONDS)
+      logger.info("====request register from netty tcp source: " + json)
+      val client = new NettyClient("localhost", json.getInteger("port"))
+      client.run()
+      schedule(5, () => {
+        val line = Random.nextLong() + ",sjk," + Random.nextInt()
+        client.send(line)
+      })
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2d1225f9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 31af3a3..ff6da2b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,6 +74,7 @@
     <module>flink-connector-redis</module>
     <module>flink-connector-flume</module>
     <module>flink-connector-activemq</module>
+    <module>flink-connector-netty</module>
   </modules>
 
   <properties>
@@ -164,6 +165,11 @@
     <pluginManagement>
       <plugins>
         <plugin>
+          <groupId>net.alchim31.maven</groupId>
+          <artifactId>scala-maven-plugin</artifactId>
+          <version>3.2.2</version>
+        </plugin>
+        <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-enforcer-plugin</artifactId>
           <version>1.4.1</version>

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2d1225f9/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 1db5977..d1c95cd 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -214,7 +214,7 @@ This file is divided into 3 sections:
     <parameters>
       <parameter name="groups">java,scala,3rdParty,spark</parameter>
       <parameter name="group.java">javax?\..*</parameter>
-      <parameter name="group.scala">scala\..*</parameter>
+      <!--<parameter name="group.scala">scala\..*</parameter>-->
       <parameter name="group.3rdParty">(?!org\.apache\.spark\.).*</parameter>
       <parameter name="group.spark">org\.apache\.spark\..*</parameter>
     </parameters>