You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "David Dreyfus (JIRA)" <ji...@apache.org> on 2017/10/25 19:31:00 UTC
[jira] [Created] (FLINK-7926) Bug in Hybrid Hash Join: Request to
spill a partition with less than two buffers.
David Dreyfus created FLINK-7926:
------------------------------------
Summary: Bug in Hybrid Hash Join: Request to spill a partition with less than two buffers.
Key: FLINK-7926
URL: https://issues.apache.org/jira/browse/FLINK-7926
Project: Flink
Issue Type: Bug
Components: Distributed Coordination
Affects Versions: 1.3.2
Environment: standalone execution on MacBook Pro
in flink-conf.yaml, taskmanager.numberOfTaskSlots changed from 1 to 3.
taskmanager.heap.mb = 1024
taskmanager.memory.preallocate = false
taskmanager.numberOfTaskSlots = 3
Reporter: David Dreyfus
The following exception is thrown as the number of tasks increases.
{code:java}
10/25/2017 14:26:16 LeftOuterJoin(Join at with(JoinOperatorSetsBase.java:232))(1/1) switched to FAILED
java.lang.RuntimeException: Bug in Hybrid Hash Join: Request to spill a partition with less than two buffers.
at org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:302)
at org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1231)
at org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:1053)
at org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:978)
at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:938)
at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631)
at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
at org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator.callWithNextKey(NonReusingBuildSecondHashJoinIterator.java:114)
at org.apache.flink.runtime.operators.AbstractOuterJoinDriver.run(AbstractOuterJoinDriver.java:160)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
10/25/2017 14:26:16 Job execution switched to status FAILING.
java.lang.RuntimeException: Bug in Hybrid Hash Join: Request to spill a partition with less than two buffers.
at org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:302)
at org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1231)
at org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:1053)
at org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:978)
at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:938)
at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631)
at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
at org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator.callWithNextKey(NonReusingBuildSecondHashJoinIterator.java:114)
at org.apache.flink.runtime.operators.AbstractOuterJoinDriver.run(AbstractOuterJoinDriver.java:160)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
{code}
I run with the following command:
{code:java}
flink run -c com.northbay.union.Union3 ./FlinkWordCount/target/FlinkWordCount-1.0-SNAPSHOT.jar --left /Users/user/Documents/Flink/Quickstart/Files/manysmall --right /Users/user/Documents/Flink/Quickstart/Files/manysmall --output /tmp/test6d_nomatch --output2 /tmp/test6d --filecount 50
{code}
The files submitted are all CSV (int, string, short)
This is the code (break out into 3 separate files before using). The idea behind this test is to compare (hash-join) pairs of files and combine their results.
{code:java}
package com.northbay.hashcount;
public class DeviceRecord1 {
public int device;
public String fingerprint;
public short dma;
public boolean match;
public DeviceRecord1() {
}
public DeviceRecord1(DeviceRecord old) {
this.device = old.device;
this.fingerprint = old.fingerprint;
this.dma = old.dma;
this.match = false;
}
public DeviceRecord1(int device, String fingerprint, short dma) {
this.device = device;
this.fingerprint = fingerprint;
this.dma = dma;
this.match = false;
}
}
package com.northbay.hashcount;
public class DeviceRecord {
public int device;
public String fingerprint;
public short dma;
public DeviceRecord() {
}
public DeviceRecord(DeviceRecord old) {
this.device = old.device;
this.fingerprint = old.fingerprint;
this.dma = old.dma;
}
public DeviceRecord(int device, String fingerprint, short dma) {
this.device = device;
this.fingerprint = fingerprint;
this.dma = dma;
}
}
package com.northbay.union;
import com.northbay.hashcount.DeviceRecord;
import com.northbay.hashcount.DeviceRecord1;
import java.util.LinkedList;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
@SuppressWarnings("serial")
public class Union2 {
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// get input data
LinkedList<DataSet<DeviceRecord1>> joined = new LinkedList<>();
DataSet<DeviceRecord1> joinedData = null;
if (params.has("left") && params.has("right")) {
for (int i = 0; i < Integer.decode(params.get("filecount")); i++) {
DataSet<DeviceRecord> l, r;
DataSet<DeviceRecord1> j;
// read the text file from given input path
l = env.readCsvFile(params.get("left") + "/" + Integer.toString(i))
.pojoType(DeviceRecord.class, "device", "fingerprint", "dma");
// read the text file from given input path
r = env.readCsvFile(params.get("right") + "/" + Integer.toString(i))
.pojoType(DeviceRecord.class, "device", "fingerprint", "dma")
.filter(new MyFilter());
j = l.leftOuterJoin(r)
.where("fingerprint")
.equalTo("fingerprint")
.with(new FlatJoinFunction<DeviceRecord, DeviceRecord, DeviceRecord1>() {
@Override
public void join(DeviceRecord left, DeviceRecord right, Collector<DeviceRecord1> out) throws Exception {
if (right == null) {
out.collect(new DeviceRecord1(left));
} else {
DeviceRecord1 d = new DeviceRecord1(left);
d.match = true;
out.collect(d);
}
}
});
if (joinedData == null) {
joinedData = j;
} else {
joinedData = joinedData.union(j);
}
joined.add(j);
}
}
// Count by DMA
DataSet<Tuple2<Integer, Integer>> counts = null;
if (joinedData
!= null) {
counts = joinedData
.flatMap(new Mapper(false))
// group by the tuple field "0" (DMA -- it's been remapped) and sum up tuple field "1"
.groupBy(0)
.sum(1);
}
// emit result
if (counts != null) {
if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", ", ");
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
}
// Count by Device
DataSet<Tuple2<Integer, Integer>> counts2 = null;
if (joinedData
!= null) {
counts2 = joinedData
.flatMap(new Mapper2(true))
// group by the tuple field "0" (Device -- it's been remapped) and sum up tuple field "1"
.groupBy(0)
.sum(1);
}
// emit result
if (counts2 != null) {
if (params.has("output2")) {
counts2.writeAsCsv(params.get("output2"), "\n", ", ");
} else {
System.out.println("Printing result to stdout. Use --output2 to specify output path.");
counts2.print();
}
}
// execute program
env.execute("Union2");
}
/**
* Implements a FlatMapFunction that counts records in a DMA user-defined
* FlatMapFunction. The function takes a device record and pulls out the
* DMA, generating ({@code Tuple2<Integer, Integer>}).
*/
public static final class Mapper implements FlatMapFunction<DeviceRecord1, Tuple2<Integer, Integer>> {
private boolean match = false;
Mapper(boolean match) {
this.match = match;
}
Mapper() {
this(false);
}
@Override
public void flatMap(DeviceRecord1 value, Collector<Tuple2<Integer, Integer>> out) {
if (value.match == match) {
out.collect(new Tuple2<>((int) value.dma, 1));
}
}
}
public static final class Mapper2 implements FlatMapFunction<DeviceRecord1, Tuple2<Integer, Integer>> {
private boolean match = false;
Mapper2(boolean match) {
this.match = match;
}
Mapper2() {
this(false);
}
@Override
public void flatMap(DeviceRecord1 value, Collector<Tuple2<Integer, Integer>> out) {
if (value.match == match) {
out.collect(new Tuple2<>((int) value.device, 1));
}
}
}
public static final class MyFilter
implements FilterFunction<DeviceRecord> {
@Override
public boolean filter(DeviceRecord value) {
return value.dma != 4;
}
}
}
{code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)