You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Itay Sali <it...@Skyhawk.security> on 2023/02/21 12:27:15 UTC

Connecting two jobs with socket over AWS EMR - How to define the ip? - cont'

Hi Samrat,


I copied the thread as I wasn't subscribed previously.

I used the core ip address or core host names and they are identical

  1.  Flink 1.5.2
  2.  I don't get any exceptions, the logs at Trace level, no relevant errors. I at5tched the jobmanager logs, the error log is empty
  3.  The source generates records and succeeds to write records. The destination doesn't read any records.

The socket try to connect to the private dns name  ip-10-0-102-186.ec2.internal or to 10.0.102.186

The reader IPs enumeration are:
2023-02-21 10:17:53,170 INFO  org.example.WindowWordCount                                  [] - Address 2 172.18.0.1
2023-02-21 10:17:53,170 INFO  org.example.WindowWordCount                                  [] - Address 2 172.17.0.1
2023-02-21 10:17:53,170 INFO  org.example.WindowWordCount                                  [] - Address 2 fe80:0:0:0:c1c:daff:fe82:c3df%eth0
2023-02-21 10:17:53,170 INFO  org.example.WindowWordCount                                  [] - Address 2 10.0.102.186
2023-02-21 10:17:53,170 INFO  org.example.WindowWordCount                                  [] - Address 2 0:0:0:0:0:0:0:1%lo
2023-02-21 10:17:53,170 INFO  org.example.WindowWordCount                                  [] - Address 2 127.0.0.1
      

The writer IPs enumeration are:
2023-02-21 10:17:58,809 INFO  org.example.Main                                             [] - Address 2 172.18.0.1
2023-02-21 10:17:58,809 INFO  org.example.Main                                             [] - Address 2 172.17.0.1
2023-02-21 10:17:58,809 INFO  org.example.Main                                             [] - Address 2 fe80:0:0:0:c1c:daff:fe82:c3df%eth0
2023-02-21 10:17:58,809 INFO  org.example.Main                                             [] - Address 2 10.0.102.186
2023-02-21 10:17:58,809 INFO  org.example.Main                                             [] - Address 2 0:0:0:0:0:0:0:1%lo
2023-02-21 10:17:58,809 INFO  org.example.Main                                             [] - Address 2 127.0.0.1



Hi Italy, `writeToSocket` supports passing the hostname , port and schema as arguments. I think you can pass the hostname of one of the core nodes in the flink job you intend to view the message. Have you tried that ? Can you add more details , 1. Which flink version are you using? 2. If you are getting some exception, can you add error logs. 3. Have you verified if your source is generating new records when you are trying to view records in socket. Bests, Samrat



Hi, I build a streaming pipeline and there are two jobs that I wish to connect with socket(later we plan to have kafka instead). The jobs submitted in AWS EMR cluster with this configuration { "Classification": "flink-conf", "Properties": { "JAVA_HOME": "/usr/lib/jvm/java-11-openjdk", "env.java.home": "/usr/lib/jvm/java-11-openjdk", "high-availability": "zookeeper", "high-availability.storageDir": "hdfs:///user/flink/recovery", "high-availability.zookeeper.path.root": "/flink", "high-availability.zookeeper.quorum": "%{hiera('hadoop::zk')}", "java.home": "/usr/lib/jvm/java-11-openjdk", "taskmanager.data.port": "35001", "taskmanager.numberOfTaskSlots": "2", "yarn.application-attempts": "10" } }, I probably miss how to define the host when I use writeToSocket and socketToStream from my jobs. Is it configuration or one of the primary/core node ips. I have tried many options and non of the messages went through. Any help would be appreciated.






Itay Sali