You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Ly, The Anh" <th...@campus.tu-berlin.de> on 2018/11/01 22:33:13 UTC

Starting a seperate Java process within a Flink cluster

Hello,


I am currently working on my masters and I encountered a difficult problem.


Background (for context): I am trying to connect different data stream processors. Therefore i am using Flink's internal mechanisms of creating custom sinks and sources to receive from and send to different data stream processors. I am starting a separate

process (message-buffer-process) in those custom sinks and sources to communicate and buffer data into that message-buffer-process.  My implementation is created with Maven and it could potentially be added as an dependency.


Problem: I already tested my implementation by adding it as an dependency to a simple Flink word-count example. The test was within an IDE which works perfectly fine. But when i package that Flink work-count example and try

to run it with "./flink run " or by uploading and submitting it as a job, it tells me that my buffer-process-class could not be found:

In German: "Fehler: Hauptklasse de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess konnte nicht gefunden oder geladen werden"

Roughly translated: "Error: Main class de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess could not be found or loaded"


Code snipplets:

Example - Adding my custom sink to send data to another data stream processor:

dataStream.addSink(
        (SinkFunction)DSPConnectorFactory
                .getInstance()
                .createSinkConnector(
                        new DSPConnectorConfig
                                .Builder("localhost", 9656)
                                .withDSP("flink")
                                .withBufferConnectorString("buffer-connection-string")
                                .withHWM(20)
                                .withTimeout(10000)
                                .build()));



The way i am trying to start the separate buffer-process: JavaProcessBuilder.exec(MessageBufferProcess.class, connectionString, addSentMessagesFrame);
How JavaProcessBuilder.exec looks like:
public static Process exec(Class javaClass, String connectionString, boolean addSentMessagesFrame) throws IOException, InterruptedException {
        String javaHome = System.getProperty("java.home");
        String javaBin = javaHome +
                File.separator + "bin" +
                File.separator + "java";
        String classpath = System.getProperty("java.class.path");
        String className = javaClass.getCanonicalName();

        System.out.println("Trying to build process " + classpath + " " + className);

        ProcessBuilder builder = new ProcessBuilder(
                javaBin, "-cp", classpath, className, connectionString, Boolean.toString(addSentMessagesFrame));

        builder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
        builder.redirectError(ProcessBuilder.Redirect.INHERIT);

        Process process = builder.start();
        return process;
}

I also tried running that message-buffer process separately in another maven project and its packaged .jar file. That worked perfectly fine too. That is why I am assuming that my approach is not appropriate for running in Flink.
Did I miss something and starting my approach doesn't actually work within Flink's context? I hope the information I gave you is sufficient to help understanding my issue. If you need any more information feel free to message me!

Thanks for any help!

 With best regards


Re: Starting a seperate Java process within a Flink cluster

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi,

I am afraid that would be extremely hard what you are trying to do as in
a cluster setup not all dependencies are taken from the taskmanager
classpath, actually the user code classes are loaded dynamically,
therefore they cannot be accessed in your new process which does not
have access to those user classes.

Best,

Dawid

On 02/11/2018 10:34, Jeff Zhang wrote:
>
> The error is most likely due to classpath issue. Because classpath is
> different when you running flink program in IDE and run it in cluster. 
>
> And starting another jvm process in SourceFunction doesn't seems a
> good approach to me, is it possible for you to do in your custom
> SourceFunction ?
>
>
> Ly, The Anh <the.a.ly@campus.tu-berlin.de
> <ma...@campus.tu-berlin.de>>于2018年11月2日周五 下午5:22写道:
>
>     Yes, i did. It is definitely there. I tried and made a separate
>     Maven project to test if something was wrong with my jar. 
>     The resulting shaded jar of that test project was fine and the
>     message-buffer-process was running with that test jar. 
>
>
>     Am 02.11.2018 04:47 schrieb Yun Tang <myasuka@live.com
>     <ma...@live.com>>:
>     Hi
>
>     Since you use the message-buffer-process as a dependency and the
>     error tells you class not found, have you ever check your
>     application jar package whether containing the wanted
>     MessageBufferProcess.class? If not existed, try to use
>     assembly-maven
>     <https://maven.apache.org/plugins/maven-assembly-plugin/>  or
>     shaded-maven
>     <https://maven.apache.org/plugins/maven-shade-plugin/> plugin to
>     include your classes.
>
>     Best
>     Yun Tang
>     ------------------------------------------------------------------------
>     *From:* Ly, The Anh <the.a.ly@campus.tu-berlin.de
>     <ma...@campus.tu-berlin.de>>
>     *Sent:* Friday, November 2, 2018 6:33
>     *To:* user@flink.apache.org <ma...@flink.apache.org>
>     *Subject:* Starting a seperate Java process within a Flink cluster
>      
>
>     Hello,
>
>
>     I am currently working on my masters and I encountered a difficult
>     problem.
>
>
>     Background (for context): I am trying to connect different data
>     stream processors. Therefore i am using Flink's
>     internal mechanisms of creating custom sinks and sources to
>     receive from and send to different data stream processors. I am
>     starting a separate 
>
>     process (message-buffer-process) in those custom sinks and sources
>     to communicate and buffer data into that message-buffer-process. 
>     My implementation is created with Maven and it could potentially
>     be added as an dependency. 
>
>
>     Problem: I already tested my implementation by adding it as
>     an dependency to a simple Flink word-count example. The test was
>     within an IDE which works perfectly fine. But when i package that
>     Flink work-count example and try
>
>     to run it with "./flink run " or by uploading and submitting it as
>     a job, it tells me that my buffer-process-class could not be found:
>
>     In German: "Fehler: Hauptklasse
>     de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess
>     konnte nicht gefunden oder geladen werden"
>
>     Roughly translated: "Error: Main class
>     de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess
>     could not be found or loaded"
>
>
>     Code snipplets:
>
>     Example - Adding my custom sink to send data to another data
>     stream processor:
>
>     dataStream.addSink(
>     (SinkFunction)DSPConnectorFactory.getInstance()
>     .createSinkConnector( newDSPConnectorConfig .Builder("localhost",
>     9656) .withDSP("flink")
>     .withBufferConnectorString("buffer-connection-string")
>     .withHWM(20) .withTimeout(10000) .build()));
>
>
>
>     The way i am trying to start the separate
>     buffer-process: JavaProcessBuilder.exec(MessageBufferProcess.class,
>     connectionString, addSentMessagesFrame); How
>     JavaProcessBuilder.exec looks like: public static Process
>     exec(Class javaClass, String connectionString, boolean
>     addSentMessagesFrame) throws IOException, InterruptedException {
>     String javaHome = System.getProperty("java.home"); String javaBin
>     = javaHome + File.separator + "bin" + File.separator + "java";
>     String classpath = System.getProperty("java.class.path"); String
>     className = javaClass.getCanonicalName();
>     System.out.println("Trying to build process " + classpath + " " +
>     className); ProcessBuilder builder = new ProcessBuilder( javaBin,
>     "-cp", classpath, className, connectionString,
>     Boolean.toString(addSentMessagesFrame));
>     builder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
>     builder.redirectError(ProcessBuilder.Redirect.INHERIT); Process
>     process = builder.start(); return process; } I also tried running
>     that message-buffer process separately in another maven project
>     and its packaged .jar file. That worked perfectly fine too. That
>     is why I am assuming that my approach is not appropriate for
>     running in Flink. Did I miss something and starting my approach
>     doesn't actually work within Flink's context? I hope the
>     information I gave you is sufficient to help understanding my
>     issue. If you need any more information feel free to message me!
>     Thanks for any help! With best regards
>
>

Re: Starting a seperate Java process within a Flink cluster

Posted by Jeff Zhang <zj...@gmail.com>.
The error is most likely due to classpath issue. Because classpath is
different when you running flink program in IDE and run it in cluster.

And starting another jvm process in SourceFunction doesn't seems a good
approach to me, is it possible for you to do in your custom SourceFunction ?


Ly, The Anh <th...@campus.tu-berlin.de>于2018年11月2日周五 下午5:22写道:

> Yes, i did. It is definitely there. I tried and made a separate Maven
> project to test if something was wrong with my jar.
> The resulting shaded jar of that test project was fine and the
> message-buffer-process was running with that test jar.
>
>
> Am 02.11.2018 04:47 schrieb Yun Tang <my...@live.com>:
> Hi
>
> Since you use the message-buffer-process as a dependency and the error
> tells you class not found, have you ever check your application jar package
> whether containing the wanted MessageBufferProcess.class? If not existed,
> try to use assembly-maven
> <https://maven.apache.org/plugins/maven-assembly-plugin/>  or shaded-maven
> <https://maven.apache.org/plugins/maven-shade-plugin/> plugin to include
> your classes.
>
> Best
> Yun Tang
> ------------------------------
> *From:* Ly, The Anh <th...@campus.tu-berlin.de>
> *Sent:* Friday, November 2, 2018 6:33
> *To:* user@flink.apache.org
> *Subject:* Starting a seperate Java process within a Flink cluster
>
>
> Hello,
>
>
> I am currently working on my masters and I encountered a difficult
> problem.
>
>
> Background (for context): I am trying to connect different data stream
> processors. Therefore i am using Flink's internal mechanisms of creating
> custom sinks and sources to receive from and send to different data stream
> processors. I am starting a separate
>
> process (message-buffer-process) in those custom sinks and sources
> to communicate and buffer data into that message-buffer-process.  My
> implementation is created with Maven and it could potentially be added as
> an dependency.
>
>
> Problem: I already tested my implementation by adding it as an dependency
> to a simple Flink word-count example. The test was within an IDE which
> works perfectly fine. But when i package that Flink work-count example and
> try
>
> to run it with "./flink run " or by uploading and submitting it as a job,
> it tells me that my buffer-process-class could not be found:
>
> In German: "Fehler: Hauptklasse
> de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess konnte nicht
> gefunden oder geladen werden"
>
> Roughly translated: "Error: Main class
> de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess could not be
> found or loaded"
>
>
> Code snipplets:
>
> Example - Adding my custom sink to send data to another data stream
> processor:
>
> dataStream.addSink(
> 	(SinkFunction)DSPConnectorFactory
> 		.getInstance()
> 		.createSinkConnector(
> 			new DSPConnectorConfig
> 				.Builder("localhost", 9656)
>                     		.withDSP("flink")
> 				.withBufferConnectorString("buffer-connection-string")
>                     		.withHWM(20)
>                     		.withTimeout(10000)
>                     		.build()));
>
>
>
> The way i am trying to start the separate buffer-process: JavaProcessBuilder.exec(MessageBufferProcess.class, connectionString, addSentMessagesFrame);
> How JavaProcessBuilder.exec looks like:
> public static Process exec(Class javaClass, String connectionString, boolean addSentMessagesFrame) throws IOException, InterruptedException {
>         String javaHome = System.getProperty("java.home");
>         String javaBin = javaHome +
>                 File.separator + "bin" +
>                 File.separator + "java";
>         String classpath = System.getProperty("java.class.path");
>         String className = javaClass.getCanonicalName();
>
>         System.out.println("Trying to build process " + classpath + " " + className);
>
>         ProcessBuilder builder = new ProcessBuilder(
>                 javaBin, "-cp", classpath, className, connectionString, Boolean.toString(addSentMessagesFrame));
>
>         builder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
>         builder.redirectError(ProcessBuilder.Redirect.INHERIT);
>
>         Process process = builder.start();
>         return process;
> }
>
> I also tried running that message-buffer process separately in another maven project and its packaged .jar file. That worked perfectly fine too. That is why I am assuming that my approach is not appropriate for running in Flink.
> Did I miss something and starting my approach doesn't actually work within Flink's context? I hope the information I gave you is sufficient to help understanding my issue. If you need any more information feel free to message me!
>
> Thanks for any help!
>
>  With best regards
>
>
>

Re: Starting a seperate Java process within a Flink cluster

Posted by "Ly, The Anh" <th...@campus.tu-berlin.de>.
Yes, i did. It is definitely there. I tried and made a separate Maven project to test if something was wrong with my jar.
The resulting shaded jar of that test project was fine and the message-buffer-process was running with that test jar.


Am 02.11.2018 04:47 schrieb Yun Tang <my...@live.com>:
Hi

Since you use the message-buffer-process as a dependency and the error tells you class not found, have you ever check your application jar package whether containing the wanted MessageBufferProcess.class? If not existed, try to use assembly-maven<https://maven.apache.org/plugins/maven-assembly-plugin/>  or shaded-maven<https://maven.apache.org/plugins/maven-shade-plugin/> plugin to include your classes.

Best
Yun Tang
________________________________
From: Ly, The Anh <th...@campus.tu-berlin.de>
Sent: Friday, November 2, 2018 6:33
To: user@flink.apache.org
Subject: Starting a seperate Java process within a Flink cluster


Hello,


I am currently working on my masters and I encountered a difficult problem.


Background (for context): I am trying to connect different data stream processors. Therefore i am using Flink's internal mechanisms of creating custom sinks and sources to receive from and send to different data stream processors. I am starting a separate

process (message-buffer-process) in those custom sinks and sources to communicate and buffer data into that message-buffer-process.  My implementation is created with Maven and it could potentially be added as an dependency.


Problem: I already tested my implementation by adding it as an dependency to a simple Flink word-count example. The test was within an IDE which works perfectly fine. But when i package that Flink work-count example and try

to run it with "./flink run " or by uploading and submitting it as a job, it tells me that my buffer-process-class could not be found:

In German: "Fehler: Hauptklasse de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess konnte nicht gefunden oder geladen werden"

Roughly translated: "Error: Main class de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess could not be found or loaded"


Code snipplets:

Example - Adding my custom sink to send data to another data stream processor:

dataStream.addSink(
        (SinkFunction)DSPConnectorFactory
                .getInstance()
                .createSinkConnector(
                        new DSPConnectorConfig
                                .Builder("localhost", 9656)
                                .withDSP("flink")
                                .withBufferConnectorString("buffer-connection-string")
                                .withHWM(20)
                                .withTimeout(10000)
                                .build()));



The way i am trying to start the separate buffer-process: JavaProcessBuilder.exec(MessageBufferProcess.class, connectionString, addSentMessagesFrame);
How JavaProcessBuilder.exec looks like:
public static Process exec(Class javaClass, String connectionString, boolean addSentMessagesFrame) throws IOException, InterruptedException {
        String javaHome = System.getProperty("java.home");
        String javaBin = javaHome +
                File.separator + "bin" +
                File.separator + "java";
        String classpath = System.getProperty("java.class.path");
        String className = javaClass.getCanonicalName();

        System.out.println("Trying to build process " + classpath + " " + className);

        ProcessBuilder builder = new ProcessBuilder(
                javaBin, "-cp", classpath, className, connectionString, Boolean.toString(addSentMessagesFrame));

        builder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
        builder.redirectError(ProcessBuilder.Redirect.INHERIT);

        Process process = builder.start();
        return process;
}

I also tried running that message-buffer process separately in another maven project and its packaged .jar file. That worked perfectly fine too. That is why I am assuming that my approach is not appropriate for running in Flink.
Did I miss something and starting my approach doesn't actually work within Flink's context? I hope the information I gave you is sufficient to help understanding my issue. If you need any more information feel free to message me!

Thanks for any help!

 With best regards


Re: Starting a seperate Java process within a Flink cluster

Posted by Yun Tang <my...@live.com>.
Hi

Since you use the message-buffer-process as a dependency and the error tells you class not found, have you ever check your application jar package whether containing the wanted MessageBufferProcess.class? If not existed, try to use assembly-maven<https://maven.apache.org/plugins/maven-assembly-plugin/>  or shaded-maven<https://maven.apache.org/plugins/maven-shade-plugin/> plugin to include your classes.

Best
Yun Tang
________________________________
From: Ly, The Anh <th...@campus.tu-berlin.de>
Sent: Friday, November 2, 2018 6:33
To: user@flink.apache.org
Subject: Starting a seperate Java process within a Flink cluster


Hello,


I am currently working on my masters and I encountered a difficult problem.


Background (for context): I am trying to connect different data stream processors. Therefore i am using Flink's internal mechanisms of creating custom sinks and sources to receive from and send to different data stream processors. I am starting a separate

process (message-buffer-process) in those custom sinks and sources to communicate and buffer data into that message-buffer-process.  My implementation is created with Maven and it could potentially be added as an dependency.


Problem: I already tested my implementation by adding it as an dependency to a simple Flink word-count example. The test was within an IDE which works perfectly fine. But when i package that Flink work-count example and try

to run it with "./flink run " or by uploading and submitting it as a job, it tells me that my buffer-process-class could not be found:

In German: "Fehler: Hauptklasse de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess konnte nicht gefunden oder geladen werden"

Roughly translated: "Error: Main class de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess could not be found or loaded"


Code snipplets:

Example - Adding my custom sink to send data to another data stream processor:

dataStream.addSink(
        (SinkFunction)DSPConnectorFactory
                .getInstance()
                .createSinkConnector(
                        new DSPConnectorConfig
                                .Builder("localhost", 9656)
                                .withDSP("flink")
                                .withBufferConnectorString("buffer-connection-string")
                                .withHWM(20)
                                .withTimeout(10000)
                                .build()));



The way i am trying to start the separate buffer-process: JavaProcessBuilder.exec(MessageBufferProcess.class, connectionString, addSentMessagesFrame);
How JavaProcessBuilder.exec looks like:
public static Process exec(Class javaClass, String connectionString, boolean addSentMessagesFrame) throws IOException, InterruptedException {
        String javaHome = System.getProperty("java.home");
        String javaBin = javaHome +
                File.separator + "bin" +
                File.separator + "java";
        String classpath = System.getProperty("java.class.path");
        String className = javaClass.getCanonicalName();

        System.out.println("Trying to build process " + classpath + " " + className);

        ProcessBuilder builder = new ProcessBuilder(
                javaBin, "-cp", classpath, className, connectionString, Boolean.toString(addSentMessagesFrame));

        builder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
        builder.redirectError(ProcessBuilder.Redirect.INHERIT);

        Process process = builder.start();
        return process;
}

I also tried running that message-buffer process separately in another maven project and its packaged .jar file. That worked perfectly fine too. That is why I am assuming that my approach is not appropriate for running in Flink.
Did I miss something and starting my approach doesn't actually work within Flink's context? I hope the information I gave you is sufficient to help understanding my issue. If you need any more information feel free to message me!

Thanks for any help!

 With best regards