You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by hagersaleh <lo...@yahoo.com> on 2015/04/27 14:32:06 UTC
error when eun program left outer join
I want implement left outer join from two dataset i use Tuple data type
package org.apache.flink.examples.java.relational;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import java.io.File;
@SuppressWarnings("serial")
public class TPCHQuery3 {
//filed name in cutomer table
public static class LeftOuterJoin implements
CoGroupFunction<Tuple2<Tuple1<Integer>, String>,
Tuple2<Tuple1<Integer>, String>,
Tuple2<Tuple1<Integer>,Tuple1<Integer>>> {
@Override
public void coGroup(Iterable<Tuple2<Tuple1<Integer>, String>>
leftElements,
Iterable<Tuple2<Tuple1<Integer>, String>>
rightElements,
Collector<Tuple2<Tuple1<Integer>,Tuple1<Integer>>> out) throws
Exception {
for (Tuple2<Tuple1<Integer>, String> leftElem : leftElements)
{
boolean hadElements = false;
for (Tuple2<Tuple1<Integer>, String> rightElem :
rightElements) {
out.collect(new
Tuple2<Tuple1<Integer>,Tuple1<Integer>>(leftElem.f0, rightElem.f0));
hadElements = true;
}
if (!hadElements) {
out.collect(new Tuple2<Tuple1<Integer>,
Tuple1<Integer>>(leftElem.f0, null));
}
}
}
}
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple1<Integer>>
leftSide=env.readCsvFile("/home/hadoop/Desktop/Dataset/customer.csv")
.fieldDelimiter('|')
.includeFields("10000000").ignoreFirstLine()
.types(Integer.class);
// DataSource<Integer> leftSide = env.fromElements(1, 2, 3, 4, 5);
DataSet<Tuple2<Tuple1<Integer>, String>> leftSide2 = leftSide.map(
new MapFunction<Tuple1<Integer>, Tuple2<Tuple1<Integer>,
String>>() {
@Override
public Tuple2<Tuple1<Integer>, String>
map(Tuple1<Integer> x) throws Exception {
return new Tuple2<Tuple1<Integer>, String>(x,
"some data");
}
});
DataSet<Tuple1<Integer>>
rightSide=env.readCsvFile("/home/hadoop/Desktop/Dataset/orders.csv")
.fieldDelimiter('|')
.includeFields("010000000").ignoreFirstLine()
.types(Integer.class);
// DataSource<Integer> rightSide = env.fromElements(1,2,4, 5, 6, 7, 8, 9,
10);
DataSet<Tuple2<Tuple1<Integer>, String>> rightSide2 =
rightSide.map(
new MapFunction<Tuple1<Integer>, Tuple2<Tuple1<Integer>,
String>>() {
@Override
public Tuple2<Tuple1<Integer>, String>
map(Tuple1<Integer> x) throws Exception {
return new Tuple2<Tuple1<Integer>, String>(x,
"some other data");
}
});
DataSet<Tuple2<Tuple1<Integer>, Tuple1<Integer>>> leftOuterJoin =
leftSide2.coGroup(rightSide2)
.where(0)
.equalTo(0)
.with(new LeftOuterJoin());
leftOuterJoin.writeAsCsv("/home/hadoop/Desktop/Dataset/output1.csv",
"\n", "|");;
env.execute();
}
Error code After run programs
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException:
org.apache.flink.types.NullFieldException: Field 1 is null, but expected to
hold a value.
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:97)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
at
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at
org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at
org.apache.flink.runtime.io.network.api.RecordWriter.emit(RecordWriter.java:82)
at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:88)
at
org.apache.flink.examples.java.relational.TPCHQuery3$LeftOuterJoin.coGroup(TPCHQuery3.java:38)
at
org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:130)
at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
at
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
at java.lang.Thread.run(Thread.java:724)
at
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349)
at
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239)
at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51)
at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:540)
at
org.apache.flink.examples.java.relational.TPCHQuery3.main(TPCHQuery3.java:80)
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/error-when-eun-program-left-outer-join-tp1141.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: error when eun program left outer join
Posted by hagersaleh <lo...@yahoo.com>.
I solve mu problem very thanks
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/error-when-eun-program-left-outer-join-tp1141p1146.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: error when eun program left outer join
Posted by hagersaleh <lo...@yahoo.com>.
implement left outer join from two dataset Customer and Orders
using Tuple data type
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/error-when-eun-program-left-outer-join-tp1141p1143.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: error when eun program left outer join
Posted by Robert Metzger <rm...@apache.org>.
Hi,
what data are you using?
The exception says "NullFieldException: Field 1 is null, but expected to hold
a value.". Maybe the data is not in the right format?
On Mon, Apr 27, 2015 at 2:32 PM, hagersaleh <lo...@yahoo.com> wrote:
> I want implement left outer join from two dataset i use Tuple data type
>
>
> package org.apache.flink.examples.java.relational;
>
>
> import org.apache.flink.api.common.functions.CoGroupFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.operators.DataSource;
> import org.apache.flink.api.java.tuple.Tuple1;
> import org.apache.flink.api.java.tuple.Tuple4;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.util.Collector;
> import java.io.File;
>
> @SuppressWarnings("serial")
> public class TPCHQuery3 {
>
> //filed name in cutomer table
>
> public static class LeftOuterJoin implements
> CoGroupFunction<Tuple2<Tuple1<Integer>, String>,
> Tuple2<Tuple1<Integer>, String>,
> Tuple2<Tuple1<Integer>,Tuple1<Integer>>> {
>
> @Override
> public void coGroup(Iterable<Tuple2<Tuple1<Integer>, String>>
> leftElements,
> Iterable<Tuple2<Tuple1<Integer>, String>>
> rightElements,
>
> Collector<Tuple2<Tuple1<Integer>,Tuple1<Integer>>> out) throws
> Exception {
>
>
>
> for (Tuple2<Tuple1<Integer>, String> leftElem : leftElements)
> {
> boolean hadElements = false;
> for (Tuple2<Tuple1<Integer>, String> rightElem :
> rightElements) {
> out.collect(new
> Tuple2<Tuple1<Integer>,Tuple1<Integer>>(leftElem.f0, rightElem.f0));
> hadElements = true;
> }
> if (!hadElements) {
> out.collect(new Tuple2<Tuple1<Integer>,
> Tuple1<Integer>>(leftElem.f0, null));
> }
> }
>
> }
> }
>
> public static void main(String[] args) throws Exception {
>
>
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> DataSet<Tuple1<Integer>>
> leftSide=env.readCsvFile("/home/hadoop/Desktop/Dataset/customer.csv")
> .fieldDelimiter('|')
>
> .includeFields("10000000").ignoreFirstLine()
> .types(Integer.class);
>
> // DataSource<Integer> leftSide = env.fromElements(1, 2, 3, 4, 5);
> DataSet<Tuple2<Tuple1<Integer>, String>> leftSide2 = leftSide.map(
> new MapFunction<Tuple1<Integer>, Tuple2<Tuple1<Integer>,
> String>>() {
> @Override
> public Tuple2<Tuple1<Integer>, String>
> map(Tuple1<Integer> x) throws Exception {
> return new Tuple2<Tuple1<Integer>, String>(x,
> "some data");
> }
> });
> DataSet<Tuple1<Integer>>
> rightSide=env.readCsvFile("/home/hadoop/Desktop/Dataset/orders.csv")
> .fieldDelimiter('|')
>
> .includeFields("010000000").ignoreFirstLine()
> .types(Integer.class);
> // DataSource<Integer> rightSide = env.fromElements(1,2,4, 5, 6, 7, 8,
> 9,
> 10);
> DataSet<Tuple2<Tuple1<Integer>, String>> rightSide2 =
> rightSide.map(
> new MapFunction<Tuple1<Integer>, Tuple2<Tuple1<Integer>,
> String>>() {
> @Override
> public Tuple2<Tuple1<Integer>, String>
> map(Tuple1<Integer> x) throws Exception {
> return new Tuple2<Tuple1<Integer>, String>(x,
> "some other data");
> }
> });
> DataSet<Tuple2<Tuple1<Integer>, Tuple1<Integer>>> leftOuterJoin =
> leftSide2.coGroup(rightSide2)
> .where(0)
> .equalTo(0)
> .with(new LeftOuterJoin());
>
> leftOuterJoin.writeAsCsv("/home/hadoop/Desktop/Dataset/output1.csv",
> "\n", "|");;
> env.execute();
>
> }
>
>
> Error code After run programs
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException:
> org.apache.flink.types.NullFieldException: Field 1 is null, but expected to
> hold a value.
> at
>
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:97)
> at
>
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
> at
>
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
> at
>
> org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
> at
>
> org.apache.flink.runtime.io.network.api.RecordWriter.emit(RecordWriter.java:82)
> at
>
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:88)
> at
>
> org.apache.flink.examples.java.relational.TPCHQuery3$LeftOuterJoin.coGroup(TPCHQuery3.java:38)
> at
>
> org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:130)
> at
>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
> at
>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
> at
>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235)
> at java.lang.Thread.run(Thread.java:724)
>
> at
>
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349)
> at
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239)
> at
>
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51)
> at
>
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:540)
> at
>
> org.apache.flink.examples.java.relational.TPCHQuery3.main(TPCHQuery3.java:80)
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/error-when-eun-program-left-outer-join-tp1141.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>