You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Itay Sali <it...@Skyhawk.security> on 2023/02/21 06:20:50 UTC

Connectin two jobs with socket over AWS EMR - How to define the ip?

Hi,

I build a streaming pipeline and there are two jobs that I wish to connect with socket(later we plan to have kafka instead).
The jobs submitted in AWS EMR cluster with this configuration

    {
      "Classification": "flink-conf",
      "Properties": {
        "JAVA_HOME": "/usr/lib/jvm/java-11-openjdk",
        "env.java.home": "/usr/lib/jvm/java-11-openjdk",
        "high-availability": "zookeeper",
        "high-availability.storageDir": "hdfs:///user/flink/recovery",
        "high-availability.zookeeper.path.root": "/flink",
        "high-availability.zookeeper.quorum": "%{hiera('hadoop::zk')}",
        "java.home": "/usr/lib/jvm/java-11-openjdk",
        "taskmanager.data.port": "35001",
        "taskmanager.numberOfTaskSlots": "2",
        "yarn.application-attempts": "10"
      }
    },


I probably miss how to define the host when I use writeToSocket and socketToStream from my jobs.
Is it configuration or one of the primary/core node ips. I have tried many options and non of the messages went through.

Any help would be appreciated.

Thanks,
Itay Sali

Re: Connectin two jobs with socket over AWS EMR - How to define the ip?

Posted by Samrat Deb <de...@gmail.com>.
Hi Italy,

`writeToSocket` supports passing the hostname , port and schema as
arguments.
I think you can pass the hostname of one of the core nodes in the flink job
you intend to view the message.
Have you tried that ?

Can you add more details ,
1. Which flink version are you using?
2. If you are getting some exception, can you add error logs.
3. Have you verified if your source is generating new records when you are
trying to view records in socket.

Bests,
Samrat

On Tue, Feb 21, 2023 at 1:27 PM Itay Sali <it...@skyhawk.security> wrote:

> Hi,
>
> I build a streaming pipeline and there are two jobs that I wish to connect
> with socket(later we plan to have kafka instead).
> The jobs submitted in AWS EMR cluster with this configuration
>
>     {
>       "Classification": "flink-conf",
>       "Properties": {
>         "JAVA_HOME": "/usr/lib/jvm/java-11-openjdk",
>         "env.java.home": "/usr/lib/jvm/java-11-openjdk",
>         "high-availability": "zookeeper",
>         "high-availability.storageDir": "hdfs:///user/flink/recovery",
>         "high-availability.zookeeper.path.root": "/flink",
>         "high-availability.zookeeper.quorum": "%{hiera('hadoop::zk')}",
>         "java.home": "/usr/lib/jvm/java-11-openjdk",
>         "taskmanager.data.port": "35001",
>         "taskmanager.numberOfTaskSlots": "2",
>         "yarn.application-attempts": "10"
>       }
>     },
>
>
> I probably miss how to define the host when I use writeToSocket and
> socketToStream from my jobs.
> Is it configuration or one of the primary/core node ips. I have tried many
> options and non of the messages went through.
>
> Any help would be appreciated.
>
> Thanks,
> Itay Sali
>