You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@bahir.apache.org by shijinkui <gi...@git.apache.org> on 2016/10/24 11:42:24 UTC

[GitHub] bahir-flink pull request #7: [BAHIR-72] support netty: pushed tcp/http conne...

GitHub user shijinkui opened a pull request:

    https://github.com/apache/bahir-flink/pull/7

    [BAHIR-72] support netty: pushed tcp/http connector

    When source stream get start, listen a provided tcp port, receive stream data from user data source.
    This netty tcp source is keepping alive and end-to-end, that is from business system to flink worker directly.
    1.	source run as a netty tcp and http server
    2.	user provide a tcp port, if the port is in used, increace the port number between 1024 to 65535. Source can parallel.
    3.	callback the provided url to report the real port to listen
    4.	user push streaming data to netty server, then collect the data to flink

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/shijinkui/bahir-flink netty_connector

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/bahir-flink/pull/7.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #7
    
----
commit a189597d11bfabb8af2fc671dd68cf8cc57d3fb2
Author: shijinkui <sh...@huawei.com>
Date:   2016-10-24T11:40:24Z

    [BAHIR-72] support netty: pushed tcp/http connector
    When source stream get start, listen a provided tcp port, receive stream data from user data source.
    This netty tcp source is keepping alive and end-to-end, that is from business system to flink worker directly.
    1.	source run as a netty tcp and http server
    2.	user provide a tcp port, if the port is in used, increace the port number between 1024 to 65535. Source can parallel.
    3.	callback the provided url to report the real port to listen
    4.	user push streaming data to netty server, then collect the data to flink

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir-flink issue #7: [BAHIR-72][bahir-flink] support netty: pushed tcp/http...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/bahir-flink/pull/7
  
    Thanks a lot for the updates. I was traveling the last few weeks, that's why I didn't have time to review your changes earlier.
    
    I'll merge the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir-flink issue #7: [BAHIR-72] support netty: pushed tcp/http connector

Posted by shijinkui <gi...@git.apache.org>.
Github user shijinkui commented on the issue:

    https://github.com/apache/bahir-flink/pull/7
  
    @rmetzger any problem?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir-flink pull request #7: [BAHIR-72] support netty: pushed tcp/http conne...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/bahir-flink/pull/7#discussion_r84674575
  
    --- Diff: flink-connector-netty/pom.xml ---
    @@ -0,0 +1,87 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  ~ Licensed to the Apache Software Foundation (ASF) under one or more
    +  ~ contributor license agreements.  See the NOTICE file distributed with
    +  ~ this work for additional information regarding copyright ownership.
    +  ~ The ASF licenses this file to You under the Apache License, Version 2.0
    +  ~ (the "License"); you may not use this file except in compliance with
    +  ~ the License.  You may obtain a copy of the License at
    +  ~
    +  ~    http://www.apache.org/licenses/LICENSE-2.0
    +  ~
    +  ~ Unless required by applicable law or agreed to in writing, software
    +  ~ distributed under the License is distributed on an "AS IS" BASIS,
    +  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  ~ See the License for the specific language governing permissions and
    +  ~ limitations under the License.
    +  -->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +  <modelVersion>4.0.0</modelVersion>
    +
    +  <parent>
    +    <groupId>org.apache.bahir</groupId>
    +    <artifactId>bahir-flink_parent_2.11</artifactId>
    +    <version>1.0.0-SNAPSHOT</version>
    +    <relativePath>..</relativePath>
    +  </parent>
    +
    +  <artifactId>flink-connector-netty_2.11</artifactId>
    +  <name>flink-connector-netty</name>
    +  <version>1.0.0-SNAPSHOT</version>
    +  <packaging>jar</packaging>
    +
    +  <dependencies>
    +    <dependency>
    +      <groupId>io.netty</groupId>
    +      <artifactId>netty-all</artifactId>
    +      <version>4.1.5.Final</version>
    +    </dependency>
    +    <dependency>
    +      <groupId>org.apache.flink</groupId>
    +      <artifactId>flink-table_${scala.binary.version}</artifactId>
    +      <version>${flink.version}</version>
    +    </dependency>
    --- End diff --
    
    I don't think its a good idea to add the Table API here as a dependency just for one connector example.
    
    Flink Table could grow quite big in the future, and users of the connector will not execute a single line of the table API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir-flink pull request #7: [BAHIR-72][bahir-flink] support netty: pushed t...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/bahir-flink/pull/7


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir-flink issue #7: [BAHIR-72][bahir-flink] support netty: pushed tcp/http...

Posted by ckadner <gi...@git.apache.org>.
Github user ckadner commented on the issue:

    https://github.com/apache/bahir-flink/pull/7
  
    Sadly we did not have *Scalatest* enabled at the time this PR was reviewed, so we missed adding automated unit tests.
    
    I opened [BAHIR-113: Flink Netty connector missing automated unit tests](https://issues.apache.org/jira/browse/BAHIR-113) to keep track of that.
    
    @shijinkui -- would you be willing to take that on and open a PR for  [BAHIR-113](https://issues.apache.org/jira/browse/BAHIR-113)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir-flink pull request #7: [BAHIR-72] support netty: pushed tcp/http conne...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/bahir-flink/pull/7#discussion_r84675183
  
    --- Diff: flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/NettyUtil.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.netty.example
    +
    +import java.io.{BufferedReader, InputStreamReader}
    +import java.net._
    +
    +import org.apache.commons.lang3.SystemUtils
    +import org.mortbay.util.MultiException
    +import org.slf4j.LoggerFactory
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    + * Netty Utility class for start netty service and retry tcp port
    + */
    +object NettyUtil {
    +  private lazy val logger = LoggerFactory.getLogger(getClass)
    +
    +  /** find local inet addresses */
    +  def findLocalInetAddress(): InetAddress = {
    +
    +    val address = InetAddress.getLocalHost
    +    address.isLoopbackAddress match {
    +      case true =>
    +        // Address resolves to something like 127.0.1.1, which happens on Debian; try to find
    +        // a better address using the local network interfaces
    +        // getNetworkInterfaces returns ifs in reverse order compared to ifconfig output order
    +        // on unix-like system. On windows, it returns in index order.
    +        // It's more proper to pick ip address following system output order.
    +        val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.asScala.toSeq
    +        val reOrderedNetworkIFs = SystemUtils.IS_OS_WINDOWS match {
    +          case true => activeNetworkIFs
    +          case false => activeNetworkIFs.reverse
    +        }
    +
    +        reOrderedNetworkIFs.find { ni: NetworkInterface =>
    +          val addr = ni.getInetAddresses.asScala.toSeq.filterNot { addr =>
    +            addr.isLinkLocalAddress || addr.isLoopbackAddress
    +          }
    +          addr.nonEmpty
    +        } match {
    +          case Some(ni) =>
    +            val addr = ni.getInetAddresses.asScala.toSeq.filterNot { inet =>
    +              inet.isLinkLocalAddress || inet.isLoopbackAddress
    +            }
    +            val address = addr.find(_.isInstanceOf[Inet4Address]).getOrElse(addr.head).getAddress
    +            // because of Inet6Address.toHostName may add interface at the end if it knows about it
    +            InetAddress.getByAddress(address)
    +          case None => address
    +        }
    +      case false => address
    +    }
    +  }
    +
    +  /** start service, if port is collision, retry 128 times */
    +  def startServiceOnPort[T](
    +    startPort: Int,
    +    startService: Int => T,
    +    maxRetries: Int = 128,
    +    serviceName: String = ""): T = {
    +
    +    if (startPort != 0 && (startPort < 1024 || startPort > 65536)) {
    +      throw new Exception("startPort should be between 1024 and 65535 (inclusive), " +
    +        "or 0 for a random free port.")
    +    }
    +
    +    val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
    +    for (offset <- 0 to maxRetries) {
    +      // Do not increment port if startPort is 0, which is treated as a special port
    +      val tryPort = if (startPort == 0) {
    +        startPort
    +      } else {
    +        // If the new port wraps around, do not try a privilege port
    +        ((startPort + offset - 1024) % (65536 - 1024)) + 1024
    +      }
    +
    +      try {
    +        val result = startService(tryPort)
    +        logger.info(s"Successfully started service$serviceString, result:$result.")
    +        return result
    +      } catch {
    +        case e: Exception if isBindCollision(e) =>
    +          if (offset >= maxRetries) {
    +            val exceptionMessage = s"${e.getMessage}: Service$serviceString failed after " +
    +              s"$maxRetries retries! Consider explicitly setting the appropriate port for the " +
    +              s"service$serviceString (for example spark.ui.port for SparkUI) to an available " +
    +              "port or increasing spark.port.maxRetries."
    --- End diff --
    
    The exception message doesn't seem to be relevant to Flink.
    
    In general, it seems that the code here seems to be copied from Apache Spark: https://github.com/apache/spark/blob/39755169fb5bb07332eef263b4c18ede1528812d/core/src/main/scala/org/apache/spark/util/Utils.scala#L2172
    
    Can you add comments to the code copied from other projects?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir-flink pull request #7: [BAHIR-72] support netty: pushed tcp/http conne...

Posted by shijinkui <gi...@git.apache.org>.
Github user shijinkui commented on a diff in the pull request:

    https://github.com/apache/bahir-flink/pull/7#discussion_r84816760
  
    --- Diff: flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/NettyUtil.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.netty.example
    +
    +import java.io.{BufferedReader, InputStreamReader}
    +import java.net._
    +
    +import org.apache.commons.lang3.SystemUtils
    +import org.mortbay.util.MultiException
    +import org.slf4j.LoggerFactory
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    + * Netty Utility class for start netty service and retry tcp port
    + */
    +object NettyUtil {
    +  private lazy val logger = LoggerFactory.getLogger(getClass)
    +
    +  /** find local inet addresses */
    +  def findLocalInetAddress(): InetAddress = {
    +
    +    val address = InetAddress.getLocalHost
    +    address.isLoopbackAddress match {
    +      case true =>
    +        // Address resolves to something like 127.0.1.1, which happens on Debian; try to find
    +        // a better address using the local network interfaces
    +        // getNetworkInterfaces returns ifs in reverse order compared to ifconfig output order
    +        // on unix-like system. On windows, it returns in index order.
    +        // It's more proper to pick ip address following system output order.
    +        val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.asScala.toSeq
    +        val reOrderedNetworkIFs = SystemUtils.IS_OS_WINDOWS match {
    +          case true => activeNetworkIFs
    +          case false => activeNetworkIFs.reverse
    +        }
    +
    +        reOrderedNetworkIFs.find { ni: NetworkInterface =>
    +          val addr = ni.getInetAddresses.asScala.toSeq.filterNot { addr =>
    +            addr.isLinkLocalAddress || addr.isLoopbackAddress
    +          }
    +          addr.nonEmpty
    +        } match {
    +          case Some(ni) =>
    +            val addr = ni.getInetAddresses.asScala.toSeq.filterNot { inet =>
    +              inet.isLinkLocalAddress || inet.isLoopbackAddress
    +            }
    +            val address = addr.find(_.isInstanceOf[Inet4Address]).getOrElse(addr.head).getAddress
    +            // because of Inet6Address.toHostName may add interface at the end if it knows about it
    +            InetAddress.getByAddress(address)
    +          case None => address
    +        }
    +      case false => address
    +    }
    +  }
    +
    +  /** start service, if port is collision, retry 128 times */
    +  def startServiceOnPort[T](
    +    startPort: Int,
    +    startService: Int => T,
    +    maxRetries: Int = 128,
    +    serviceName: String = ""): T = {
    +
    +    if (startPort != 0 && (startPort < 1024 || startPort > 65536)) {
    +      throw new Exception("startPort should be between 1024 and 65535 (inclusive), " +
    +        "or 0 for a random free port.")
    +    }
    +
    +    val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
    +    for (offset <- 0 to maxRetries) {
    +      // Do not increment port if startPort is 0, which is treated as a special port
    +      val tryPort = if (startPort == 0) {
    +        startPort
    +      } else {
    +        // If the new port wraps around, do not try a privilege port
    +        ((startPort + offset - 1024) % (65536 - 1024)) + 1024
    +      }
    +
    +      try {
    +        val result = startService(tryPort)
    +        logger.info(s"Successfully started service$serviceString, result:$result.")
    +        return result
    +      } catch {
    +        case e: Exception if isBindCollision(e) =>
    +          if (offset >= maxRetries) {
    +            val exceptionMessage = s"${e.getMessage}: Service$serviceString failed after " +
    +              s"$maxRetries retries! Consider explicitly setting the appropriate port for the " +
    +              s"service$serviceString (for example spark.ui.port for SparkUI) to an available " +
    +              "port or increasing spark.port.maxRetries."
    --- End diff --
    
    @rmetzger OK, added


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---