You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Till Rohrmann <tr...@apache.org> on 2018/11/07 14:52:52 UTC

Re: RichInputFormat working differently in eclipse and in flink cluster

Hi Teena,

which Flink version are you using? Have you tried whether this happens with
the latest release 1.6.2 as well?

Cheers,
Till

On Fri, Oct 26, 2018 at 1:17 PM Teena Kappen // BPRISE <
teena.kappen@bprise.com> wrote:

> Hi all,
>
>
>
> I have implemented RichInputFormat for reading result of aggregation
> queries in Elasticsearch. There are around 100000 buckets, which are of
> type json array. Note: This is one time response.
>
>
>
> My idea here is to iterate these arrays in parallel. Here is the pseudo
> code.
>
>
>
> public void configure(Configuration parameters) {
>
> System.out.println("configure");
>
> }
>
>
>
> public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
>
> }
>
>
>
> public ResponseInputSplit[] createInputSplits(int minNumSplits){
>
> System.out.println("createInputSplits");
>
>
>
> //read from elastic
>
> // add buckets to array
>
> }
>
>
>
> public InputSplitAssigner getInputSplitAssigner(ResponseInputSplit[]
> inputSplits) {
>
> //this is default
>
> System.out.println("getInputSplitAssigner");
>
> return new DefaultInputSplitAssigner(inputSplits);
>
> }
>
>
>
> public void open(ResponseInputSplit split) {
>
> //read buckets
>
> }
>
>
>
> public boolean reachedEnd(){
>
> System.out.println("reachedEnd");
>
> }
>
>
>
> public Bounce nextRecord(Bounce reuse) {
>
> }
>
>
>
> public void close(){
>
> }
>
>
>
> // my main method,
>
> ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
>
> DataSet<Bounce> bounce_data_set = env.createInput(new
> MyInputDataSetInputFormat());
>
>
>
> When running in eclipse, it executes createInputSplits and the results
> look fine. Logs are given below.
>
> Output is à
>
> configure
>
> Connected to JobManager at
> Actor[akka://flink/user/jobmanager_1#-1685591882] with leader session id...
>
> configure
>
> createInputSplits
>
>
>
> When submitting job in flink cluster, it doesn’t execute ‘configure’ and
> ‘createInputSplits’ methods. Instead it directly goes to nextRecord
> function. Logs are given below.
>
> Output is à
>
> Starting execution of program
>
> configure
>
> Submitting job with JobID: 47526660fc9a463cad4bee04a4ba99d9. Waiting for
> job completion.
>
> Connected to JobManager at Actor[akka.tcp://flink@xxxx:xxx
> /user/jobmanager#1219973491] with leader session id...
>
> 10/26/2018 15:05:57     Job execution switched to status RUNNING.
>
> 10/26/2018 15:05:57     DataSource (at
> createInput(ExecutionEnvironment.java:547) ())(1/1) switched to SCHEDULED
>
> 10/26/2018 15:05:57     DataSource (at
> createInput(ExecutionEnvironment.java:547) ())(1/1) switched to DEPLOYING
>
> 10/26/2018 15:06:00     DataSource (at
> createInput(ExecutionEnvironment.java:547) ())(1/1) switched to RUNNING
>
> 10/26/2018 15:06:00     DataSource (at
> createInput(ExecutionEnvironment.java:547) ())(1/1) switched to FAILED
>
> java.lang.NullPointerException
>
>                                at com.xxx.test.
> MyInputDataSetInputFormat.nextRecord(MyInputDataSetInputFormat.java:143)
>
>
>
> Regards,
>
> Teena
>
>
>

RE: RichInputFormat working differently in eclipse and in flink cluster

Posted by Teena Kappen // BPRISE <te...@bprise.com>.
Hi Till,

We are using 1.4.0. We have not tried this any other releases.

We will try this on 1.6.2 and see what happens.

Thank you.

Regards,
Teena

From: Till Rohrmann <tr...@apache.org>
Sent: 07 November 2018 20:23
To: Teena Kappen // BPRISE <te...@bprise.com>
Cc: user <us...@flink.apache.org>
Subject: Re: RichInputFormat working differently in eclipse and in flink cluster

Hi Teena,

which Flink version are you using? Have you tried whether this happens with the latest release 1.6.2 as well?

Cheers,
Till

On Fri, Oct 26, 2018 at 1:17 PM Teena Kappen // BPRISE <te...@bprise.com>> wrote:
Hi all,

I have implemented RichInputFormat for reading result of aggregation queries in Elasticsearch. There are around 100000 buckets, which are of type json array. Note: This is one time response.

My idea here is to iterate these arrays in parallel. Here is the pseudo code.

public void configure(Configuration parameters) {
System.out.println("configure");
}

public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
}

public ResponseInputSplit[] createInputSplits(int minNumSplits){
System.out.println("createInputSplits");

//read from elastic
// add buckets to array
}

public InputSplitAssigner getInputSplitAssigner(ResponseInputSplit[] inputSplits) {
//this is default
System.out.println("getInputSplitAssigner");
return new DefaultInputSplitAssigner(inputSplits);
}

public void open(ResponseInputSplit split) {
//read buckets
}

public boolean reachedEnd(){
System.out.println("reachedEnd");
}

public Bounce nextRecord(Bounce reuse) {
}

public void close(){
}

// my main method,
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Bounce> bounce_data_set = env.createInput(new MyInputDataSetInputFormat());

When running in eclipse, it executes createInputSplits and the results look fine. Logs are given below.
Output is -->
configure
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1685591882] with leader session id...
configure
createInputSplits

When submitting job in flink cluster, it doesn’t execute ‘configure’ and ‘createInputSplits’ methods. Instead it directly goes to nextRecord function. Logs are given below.
Output is -->
Starting execution of program
configure
Submitting job with JobID: 47526660fc9a463cad4bee04a4ba99d9. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@xxxx:xxx /user/jobmanager#1219973491] with leader session id...
10/26/2018 15:05:57     Job execution switched to status RUNNING.
10/26/2018 15:05:57     DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to SCHEDULED
10/26/2018 15:05:57     DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to DEPLOYING
10/26/2018 15:06:00     DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to RUNNING
10/26/2018 15:06:00     DataSource (at createInput(ExecutionEnvironment.java:547) ())(1/1) switched to FAILED
java.lang.NullPointerException
                               at com.xxx.test. MyInputDataSetInputFormat.nextRecord(MyInputDataSetInputFormat.java:143)

Regards,
Teena