You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lydia Ickler <ic...@googlemail.com> on 2016/02/01 11:28:39 UTC

Re: cluster execution

Hi Till,

thanks for your reply!
I tested it with the Wordcount example.
Everything works fine if I run the command:
./flink run -p 3 /home/flink/examples/WordCount.jar
Then the program gets executed by my 3 workers. 
If I want to save the output to a file:
./flink run -p 3 /home/flink/examples/WordCount.jar hdfs://grips2:9000/users/Flink_1000.csv hdfs://grips2:9000/users/Wordcount_1000 <hdfs://grips2:9000/users/Wordcount_1000>

I get the following error message:
What am I doing wrong? Is something wrong with my cluster writing permissions?

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Cannot initialize task 'DataSink (CsvOutputFormat (path: hdfs://grips2:9000/users/Wordcount_1000s, delimiter:  ))': Output directory could not be created.
	at org.apache.flink.client.program.Client.runBlocking(Client.java:370)
	at org.apache.flink.client.program.Client.runBlocking(Client.java:348)
	at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
	at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
	at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:78)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:483)
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
	at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
	at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676)
	at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
	at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978)
	at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (CsvOutputFormat (path: hdfs://grips2:9000/users/Wordcount_1000s, delimiter:  ))': Output directory could not be created.
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:867)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:851)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:851)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:341)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:100)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Output directory could not be created.
	at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:295)
	at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84)
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:863)
	... 29 more

The exception above occurred while trying to run your command.


> Am 28.01.2016 um 10:44 schrieb Till Rohrmann <ti...@gmail.com>:
> 
> Hi Lydia,
> 
> what do you mean with master? Usually when you submit a program to the cluster and don’t specify the parallelism in your program, then it will be executed with the parallelism.default value as parallelism. You can specify the value in your cluster configuration flink-config.yaml file. Alternatively you can always specify the parallelism via the CLI client with the -p option.
> 
> Cheers,
> Till
> 
> 
> On Thu, Jan 28, 2016 at 9:53 AM, Lydia Ickler <icklerly@googlemail.com <ma...@googlemail.com>> wrote:
> Hi all,
> 
> I am doing some operations on a DataSet<Tuple3<Integer,Integer,Double>> … (see code below)
> When I run my program on a cluster with 3 machines I can see within the web client that only my master is executing the program. 
> Do I have to specify somewhere that all machines have to participate? Usually the cluster executes in parallel.
> 
> Any suggestions?
> 
> Best regards, 
> Lydia
> DataSet<Tuple3<Integer, Integer, Double>> matrixA = readMatrix(env, input);
> DataSet<Tuple3<Integer, Integer, Double>> initial = matrixA.groupBy(0).sum(2);
> 
> //normalize by maximum value
> initial = initial.cross(initial.max(2)).map(new normalizeByMax());
> matrixA.join(initial).where(1).equalTo(0)
>       .map(new ProjectJoinResultMapper()).groupBy(0, 1).sum(2);
> 
> 


Re: cluster execution

Posted by Lydia Ickler <ic...@googlemail.com>.
xD…  a simple "hdfs dfs -chmod -R 777 /users" fixed it!


> Am 01.02.2016 um 12:17 schrieb Till Rohrmann <tr...@apache.org>:
> 
> Hi Lydia,
> 
> I looks like that. I guess you should check your hdfs access rights. 
> 
> Cheers,
> Till
> 
> On Mon, Feb 1, 2016 at 11:28 AM, Lydia Ickler <icklerly@googlemail.com <ma...@googlemail.com>> wrote:
> Hi Till,
> 
> thanks for your reply!
> I tested it with the Wordcount example.
> Everything works fine if I run the command:
> ./flink run -p 3 /home/flink/examples/WordCount.jar
> Then the program gets executed by my 3 workers. 
> If I want to save the output to a file:
> ./flink run -p 3 /home/flink/examples/WordCount.jar hdfs://grips2:9000/users/Flink_1000.csv <> hdfs://grips2:9000/users/Wordcount_1000 <>
> 
> I get the following error message:
> What am I doing wrong? Is something wrong with my cluster writing permissions?
> 
> org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Cannot initialize task 'DataSink (CsvOutputFormat (path: hdfs://grips2:9000/users/Wordcount_1000s <>, delimiter:  ))': Output directory could not be created.
> 	at org.apache.flink.client.program.Client.runBlocking(Client.java:370)
> 	at org.apache.flink.client.program.Client.runBlocking(Client.java:348)
> 	at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
> 	at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
> 	at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:78)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:483)
> 	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
> 	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
> 	at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
> 	at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676)
> 	at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
> 	at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978)
> 	at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (CsvOutputFormat (path: hdfs://grips2:9000/users/Wordcount_1000s <>, delimiter:  ))': Output directory could not be created.
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:867)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:851)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 	at org.apache.flink.runtime.jobmanager.JobManager.org <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:851)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:341)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> 	at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> 	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> 	at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> 	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> 	at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> 	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:100)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> 	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.io.IOException: Output directory could not be created.
> 	at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:295)
> 	at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:863)
> 	... 29 more
> 
> The exception above occurred while trying to run your command.
> 
> 
>> Am 28.01.2016 um 10:44 schrieb Till Rohrmann <till.rohrmann@gmail.com <ma...@gmail.com>>:
>> 
>> Hi Lydia,
>> 
>> what do you mean with master? Usually when you submit a program to the cluster and don’t specify the parallelism in your program, then it will be executed with the parallelism.default value as parallelism. You can specify the value in your cluster configuration flink-config.yaml file. Alternatively you can always specify the parallelism via the CLI client with the -p option.
>> 
>> Cheers,
>> Till
>> 
>> 
>> On Thu, Jan 28, 2016 at 9:53 AM, Lydia Ickler <icklerly@googlemail.com <ma...@googlemail.com>> wrote:
>> Hi all,
>> 
>> I am doing some operations on a DataSet<Tuple3<Integer,Integer,Double>> … (see code below)
>> When I run my program on a cluster with 3 machines I can see within the web client that only my master is executing the program. 
>> Do I have to specify somewhere that all machines have to participate? Usually the cluster executes in parallel.
>> 
>> Any suggestions?
>> 
>> Best regards, 
>> Lydia
>> DataSet<Tuple3<Integer, Integer, Double>> matrixA = readMatrix(env, input);
>> DataSet<Tuple3<Integer, Integer, Double>> initial = matrixA.groupBy(0).sum(2);
>> 
>> //normalize by maximum value
>> initial = initial.cross(initial.max(2)).map(new normalizeByMax());
>> matrixA.join(initial).where(1).equalTo(0)
>>       .map(new ProjectJoinResultMapper()).groupBy(0, 1).sum(2);
>> 
>> 
> 
> 


Re: cluster execution

Posted by Till Rohrmann <tr...@apache.org>.
Hi Lydia,

I looks like that. I guess you should check your hdfs access rights.

Cheers,
Till

On Mon, Feb 1, 2016 at 11:28 AM, Lydia Ickler <ic...@googlemail.com>
wrote:

> Hi Till,
>
> thanks for your reply!
> I tested it with the Wordcount example.
> Everything works fine if I run the command:
> ./flink run -p 3 /home/flink/examples/WordCount.jar
> Then the program gets executed by my 3 workers.
> If I want to save the output to a file:
> ./flink run -p 3 /home/flink/examples/WordCount.jar
> hdfs://grips2:9000/users/Flink_1000.csv
> hdfs://grips2:9000/users/Wordcount_1000
>
> I get the following error message:
> What am I doing wrong? Is something wrong with my cluster writing
> permissions?
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Cannot initialize task 'DataSink (CsvOutputFormat (path:
> hdfs://grips2:9000/users/Wordcount_1000s, delimiter:  ))': Output
> directory could not be created.
> at org.apache.flink.client.program.Client.runBlocking(Client.java:370)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:348)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
> at
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:78)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
> at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
> initialize task 'DataSink (CsvOutputFormat (path:
> hdfs://grips2:9000/users/Wordcount_1000s, delimiter:  ))': Output
> directory could not be created.
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:867)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:851)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at org.apache.flink.runtime.jobmanager.JobManager.org
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:851)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:341)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:100)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.io.IOException: Output directory could not be created.
> at
> org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:295)
> at
> org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:863)
> ... 29 more
>
> The exception above occurred while trying to run your command.
>
>
> Am 28.01.2016 um 10:44 schrieb Till Rohrmann <ti...@gmail.com>:
>
> Hi Lydia,
>
> what do you mean with master? Usually when you submit a program to the
> cluster and don’t specify the parallelism in your program, then it will be
> executed with the parallelism.default value as parallelism. You can
> specify the value in your cluster configuration flink-config.yaml file.
> Alternatively you can always specify the parallelism via the CLI client
> with the -p option.
>
> Cheers,
> Till
> ​
>
> On Thu, Jan 28, 2016 at 9:53 AM, Lydia Ickler <ic...@googlemail.com>
> wrote:
>
>> Hi all,
>>
>> I am doing some operations on a DataSet<Tuple3<Integer,Integer,Double>> …
>> (see code below)
>> When I run my program on a cluster with 3 machines I can see within the
>> web client that only my master is executing the program.
>> Do I have to specify somewhere that all machines have to participate?
>> Usually the cluster executes in parallel.
>>
>> Any suggestions?
>>
>> Best regards,
>> Lydia
>>
>> DataSet<Tuple3<Integer, Integer, Double>> matrixA = readMatrix(env, input);
>>
>> DataSet<Tuple3<Integer, Integer, Double>> initial = matrixA.groupBy(0).sum(2);
>>
>> //normalize by maximum value
>> initial = initial.cross(initial.max(2)).map(new normalizeByMax());
>>
>> matrixA.join(initial).where(1).equalTo(0)
>>
>>       .map(new ProjectJoinResultMapper()).groupBy(0, 1).sum(2);
>>
>>
>>
>
>