You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by hagersaleh <lo...@yahoo.com> on 2015/04/27 14:32:06 UTC

error when eun program left outer join

I want implement left outer join from two dataset i use Tuple data type


package org.apache.flink.examples.java.relational;


import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import java.io.File;

@SuppressWarnings("serial")
public class TPCHQuery3 {

//filed name in cutomer table
 
public static class LeftOuterJoin implements
CoGroupFunction<Tuple2&lt;Tuple1&lt;Integer>, String>,
Tuple2<Tuple1&lt;Integer>, String>,
Tuple2<Tuple1&lt;Integer>,Tuple1<Integer>>> {

    @Override
    public void coGroup(Iterable<Tuple2&lt;Tuple1&lt;Integer>, String>>
leftElements,
                        Iterable<Tuple2&lt;Tuple1&lt;Integer>, String>>
rightElements,
                       
Collector<Tuple2&lt;Tuple1&lt;Integer>,Tuple1<Integer>>> out) throws
Exception {

            

            for (Tuple2<Tuple1&lt;Integer>, String> leftElem : leftElements)
{
                    boolean hadElements = false;
                    for (Tuple2<Tuple1&lt;Integer>, String> rightElem :
rightElements) {
                            out.collect(new
Tuple2<Tuple1&lt;Integer>,Tuple1<Integer>>(leftElem.f0, rightElem.f0));
                            hadElements = true;
                    }
                    if (!hadElements) {
                            out.collect(new Tuple2<Tuple1&lt;Integer>,
Tuple1<Integer>>(leftElem.f0, null));
                    }
            }

    }
  }
	
public static void main(String[] args) throws Exception {
      
      
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple1&lt;Integer>>
leftSide=env.readCsvFile("/home/hadoop/Desktop/Dataset/customer.csv")
					.fieldDelimiter('|')
                                       
.includeFields("10000000").ignoreFirstLine()
                                        .types(Integer.class);

   // DataSource<Integer> leftSide = env.fromElements(1, 2, 3, 4, 5);
    DataSet<Tuple2&lt;Tuple1&lt;Integer>, String>> leftSide2 = leftSide.map(
        new MapFunction<Tuple1&lt;Integer>, Tuple2<Tuple1&lt;Integer>,
String>>() {
                @Override
                public Tuple2<Tuple1&lt;Integer>, String>
map(Tuple1<Integer> x) throws Exception {
                        return new Tuple2<Tuple1&lt;Integer>, String>(x,
"some data");
                }
        });
DataSet<Tuple1&lt;Integer>>
rightSide=env.readCsvFile("/home/hadoop/Desktop/Dataset/orders.csv")
					.fieldDelimiter('|')
                                       
.includeFields("010000000").ignoreFirstLine()
                                        .types(Integer.class);
   // DataSource<Integer> rightSide = env.fromElements(1,2,4, 5, 6, 7, 8, 9,
10);
    DataSet<Tuple2&lt;Tuple1&lt;Integer>, String>> rightSide2 =
rightSide.map(
        new MapFunction<Tuple1&lt;Integer>, Tuple2<Tuple1&lt;Integer>,
String>>() {
                @Override
                public Tuple2<Tuple1&lt;Integer>, String>
map(Tuple1<Integer> x) throws Exception {
                        return new Tuple2<Tuple1&lt;Integer>, String>(x,
"some other data");
                }
        });
    DataSet<Tuple2&lt;Tuple1&lt;Integer>, Tuple1<Integer>>> leftOuterJoin =
leftSide2.coGroup(rightSide2)
            .where(0)
            .equalTo(0)
            .with(new LeftOuterJoin());

    leftOuterJoin.writeAsCsv("/home/hadoop/Desktop/Dataset/output1.csv",
"\n", "|");;
    env.execute();

}
    

Error code After run programs
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException:
org.apache.flink.types.NullFieldException: Field 1 is null, but expected to
hold a value.
    at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:97)
    at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
    at
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
    at
org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
    at
org.apache.flink.runtime.io.network.api.RecordWriter.emit(RecordWriter.java:82)
    at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:88)
    at
org.apache.flink.examples.java.relational.TPCHQuery3$LeftOuterJoin.coGroup(TPCHQuery3.java:38)
    at
org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:130)
    at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
    at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
    at
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
    at java.lang.Thread.run(Thread.java:724)

    at
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349)
    at
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239)
    at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51)
    at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:540)
    at
org.apache.flink.examples.java.relational.TPCHQuery3.main(TPCHQuery3.java:80)



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/error-when-eun-program-left-outer-join-tp1141.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: error when eun program left outer join

Posted by hagersaleh <lo...@yahoo.com>.
I solve mu problem very thanks



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/error-when-eun-program-left-outer-join-tp1141p1146.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: error when eun program left outer join

Posted by hagersaleh <lo...@yahoo.com>.
implement left outer join from two dataset Customer and Orders
using Tuple data type



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/error-when-eun-program-left-outer-join-tp1141p1143.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: error when eun program left outer join

Posted by Robert Metzger <rm...@apache.org>.
Hi,

what data are you using?

The exception says "NullFieldException: Field 1 is null, but expected to hold
a value.". Maybe the data is not in the right format?

On Mon, Apr 27, 2015 at 2:32 PM, hagersaleh <lo...@yahoo.com> wrote:

> I want implement left outer join from two dataset i use Tuple data type
>
>
> package org.apache.flink.examples.java.relational;
>
>
> import org.apache.flink.api.common.functions.CoGroupFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.operators.DataSource;
> import org.apache.flink.api.java.tuple.Tuple1;
> import org.apache.flink.api.java.tuple.Tuple4;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.util.Collector;
> import java.io.File;
>
> @SuppressWarnings("serial")
> public class TPCHQuery3 {
>
> //filed name in cutomer table
>
> public static class LeftOuterJoin implements
> CoGroupFunction<Tuple2<Tuple1<Integer>, String>,
> Tuple2<Tuple1<Integer>, String>,
> Tuple2<Tuple1<Integer>,Tuple1<Integer>>> {
>
>     @Override
>     public void coGroup(Iterable<Tuple2<Tuple1<Integer>, String>>
> leftElements,
>                         Iterable<Tuple2<Tuple1<Integer>, String>>
> rightElements,
>
> Collector<Tuple2<Tuple1<Integer>,Tuple1<Integer>>> out) throws
> Exception {
>
>
>
>             for (Tuple2<Tuple1<Integer>, String> leftElem : leftElements)
> {
>                     boolean hadElements = false;
>                     for (Tuple2<Tuple1<Integer>, String> rightElem :
> rightElements) {
>                             out.collect(new
> Tuple2<Tuple1<Integer>,Tuple1<Integer>>(leftElem.f0, rightElem.f0));
>                             hadElements = true;
>                     }
>                     if (!hadElements) {
>                             out.collect(new Tuple2<Tuple1<Integer>,
> Tuple1<Integer>>(leftElem.f0, null));
>                     }
>             }
>
>     }
>   }
>
> public static void main(String[] args) throws Exception {
>
>
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> DataSet<Tuple1<Integer>>
> leftSide=env.readCsvFile("/home/hadoop/Desktop/Dataset/customer.csv")
>                                         .fieldDelimiter('|')
>
> .includeFields("10000000").ignoreFirstLine()
>                                         .types(Integer.class);
>
>    // DataSource<Integer> leftSide = env.fromElements(1, 2, 3, 4, 5);
>     DataSet<Tuple2<Tuple1<Integer>, String>> leftSide2 = leftSide.map(
>         new MapFunction<Tuple1<Integer>, Tuple2<Tuple1<Integer>,
> String>>() {
>                 @Override
>                 public Tuple2<Tuple1<Integer>, String>
> map(Tuple1<Integer> x) throws Exception {
>                         return new Tuple2<Tuple1<Integer>, String>(x,
> "some data");
>                 }
>         });
> DataSet<Tuple1<Integer>>
> rightSide=env.readCsvFile("/home/hadoop/Desktop/Dataset/orders.csv")
>                                         .fieldDelimiter('|')
>
> .includeFields("010000000").ignoreFirstLine()
>                                         .types(Integer.class);
>    // DataSource<Integer> rightSide = env.fromElements(1,2,4, 5, 6, 7, 8,
> 9,
> 10);
>     DataSet<Tuple2<Tuple1<Integer>, String>> rightSide2 =
> rightSide.map(
>         new MapFunction<Tuple1<Integer>, Tuple2<Tuple1<Integer>,
> String>>() {
>                 @Override
>                 public Tuple2<Tuple1<Integer>, String>
> map(Tuple1<Integer> x) throws Exception {
>                         return new Tuple2<Tuple1<Integer>, String>(x,
> "some other data");
>                 }
>         });
>     DataSet<Tuple2<Tuple1<Integer>, Tuple1<Integer>>> leftOuterJoin =
> leftSide2.coGroup(rightSide2)
>             .where(0)
>             .equalTo(0)
>             .with(new LeftOuterJoin());
>
>     leftOuterJoin.writeAsCsv("/home/hadoop/Desktop/Dataset/output1.csv",
> "\n", "|");;
>     env.execute();
>
> }
>
>
> Error code After run programs
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException:
> org.apache.flink.types.NullFieldException: Field 1 is null, but expected to
> hold a value.
>     at
>
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:97)
>     at
>
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>     at
>
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
>     at
>
> org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
>     at
>
> org.apache.flink.runtime.io.network.api.RecordWriter.emit(RecordWriter.java:82)
>     at
>
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:88)
>     at
>
> org.apache.flink.examples.java.relational.TPCHQuery3$LeftOuterJoin.coGroup(TPCHQuery3.java:38)
>     at
>
> org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:130)
>     at
>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>     at
>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>     at
>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
>     at java.lang.Thread.run(Thread.java:724)
>
>     at
>
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349)
>     at
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239)
>     at
>
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51)
>     at
>
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:540)
>     at
>
> org.apache.flink.examples.java.relational.TPCHQuery3.main(TPCHQuery3.java:80)
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/error-when-eun-program-left-outer-join-tp1141.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>