You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Till Rohrmann (Jira)" <ji...@apache.org> on 2019/11/08 10:56:00 UTC
[jira] [Updated] (FLINK-7926) Bug in Hybrid Hash Join: Request to
spill a partition with less than two buffers.
[ https://issues.apache.org/jira/browse/FLINK-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Till Rohrmann updated FLINK-7926:
---------------------------------
Component/s: (was: Runtime / Coordination)
API / DataSet
> Bug in Hybrid Hash Join: Request to spill a partition with less than two buffers.
> ---------------------------------------------------------------------------------
>
> Key: FLINK-7926
> URL: https://issues.apache.org/jira/browse/FLINK-7926
> Project: Flink
> Issue Type: Bug
> Components: API / DataSet
> Affects Versions: 1.3.2
> Environment: standalone execution on MacBook Pro
> in flink-conf.yaml, taskmanager.numberOfTaskSlots changed from 1 to 3.
> taskmanager.heap.mb = 1024
> taskmanager.memory.preallocate = false
> taskmanager.numberOfTaskSlots = 3
> Reporter: David Dreyfus
> Priority: Major
>
> The following exception is thrown as the number of tasks increases.
> {code:java}
> 10/25/2017 14:26:16 LeftOuterJoin(Join at with(JoinOperatorSetsBase.java:232))(1/1) switched to FAILED
> java.lang.RuntimeException: Bug in Hybrid Hash Join: Request to spill a partition with less than two buffers.
> at org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:302)
> at org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1231)
> at org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:1053)
> at org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:978)
> at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:938)
> at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631)
> at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
> at org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator.callWithNextKey(NonReusingBuildSecondHashJoinIterator.java:114)
> at org.apache.flink.runtime.operators.AbstractOuterJoinDriver.run(AbstractOuterJoinDriver.java:160)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
> 10/25/2017 14:26:16 Job execution switched to status FAILING.
> java.lang.RuntimeException: Bug in Hybrid Hash Join: Request to spill a partition with less than two buffers.
> at org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:302)
> at org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1231)
> at org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:1053)
> at org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:978)
> at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:938)
> at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631)
> at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
> at org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator.callWithNextKey(NonReusingBuildSecondHashJoinIterator.java:114)
> at org.apache.flink.runtime.operators.AbstractOuterJoinDriver.run(AbstractOuterJoinDriver.java:160)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> I run with the following command:
> {code:java}
> flink run -c com.northbay.union.Union3 ./FlinkWordCount/target/FlinkWordCount-1.0-SNAPSHOT.jar --left /Users/user/Documents/Flink/Quickstart/Files/manysmall --right /Users/user/Documents/Flink/Quickstart/Files/manysmall --output /tmp/test6d_nomatch --output2 /tmp/test6d --filecount 50
> {code}
> The files submitted are all CSV (int, string, short)
> This is the code (break out into 3 separate files before using). The idea behind this test is to compare (hash-join) pairs of files and combine their results.
> {code:java}
> package com.northbay.hashcount;
> public class DeviceRecord1 {
> public int device;
> public String fingerprint;
> public short dma;
> public boolean match;
> public DeviceRecord1() {
>
> }
> public DeviceRecord1(DeviceRecord old) {
> this.device = old.device;
> this.fingerprint = old.fingerprint;
> this.dma = old.dma;
> this.match = false;
> }
> public DeviceRecord1(int device, String fingerprint, short dma) {
> this.device = device;
> this.fingerprint = fingerprint;
> this.dma = dma;
> this.match = false;
> }
> }
> package com.northbay.hashcount;
> public class DeviceRecord {
> public int device;
> public String fingerprint;
> public short dma;
> public DeviceRecord() {
>
> }
> public DeviceRecord(DeviceRecord old) {
> this.device = old.device;
> this.fingerprint = old.fingerprint;
> this.dma = old.dma;
> }
> public DeviceRecord(int device, String fingerprint, short dma) {
> this.device = device;
> this.fingerprint = fingerprint;
> this.dma = dma;
> }
> }
> package com.northbay.union;
> import com.northbay.hashcount.DeviceRecord;
> import com.northbay.hashcount.DeviceRecord1;
> import java.util.LinkedList;
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.functions.FilterFunction;
> import org.apache.flink.api.common.functions.FlatJoinFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.utils.ParameterTool;
> import org.apache.flink.util.Collector;
> @SuppressWarnings("serial")
> public class Union2 {
> // *************************************************************************
> // PROGRAM
> // *************************************************************************
> public static void main(String[] args) throws Exception {
> final ParameterTool params = ParameterTool.fromArgs(args);
> // set up the execution environment
> final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> // make parameters available in the web interface
> env.getConfig().setGlobalJobParameters(params);
> // get input data
> LinkedList<DataSet<DeviceRecord1>> joined = new LinkedList<>();
> DataSet<DeviceRecord1> joinedData = null;
> if (params.has("left") && params.has("right")) {
> for (int i = 0; i < Integer.decode(params.get("filecount")); i++) {
> DataSet<DeviceRecord> l, r;
> DataSet<DeviceRecord1> j;
> // read the text file from given input path
> l = env.readCsvFile(params.get("left") + "/" + Integer.toString(i))
> .pojoType(DeviceRecord.class, "device", "fingerprint", "dma");
> // read the text file from given input path
> r = env.readCsvFile(params.get("right") + "/" + Integer.toString(i))
> .pojoType(DeviceRecord.class, "device", "fingerprint", "dma")
> .filter(new MyFilter());
> j = l.leftOuterJoin(r)
> .where("fingerprint")
> .equalTo("fingerprint")
> .with(new FlatJoinFunction<DeviceRecord, DeviceRecord, DeviceRecord1>() {
> @Override
> public void join(DeviceRecord left, DeviceRecord right, Collector<DeviceRecord1> out) throws Exception {
> if (right == null) {
> out.collect(new DeviceRecord1(left));
> } else {
> DeviceRecord1 d = new DeviceRecord1(left);
> d.match = true;
> out.collect(d);
> }
> }
> });
>
> if (joinedData == null) {
> joinedData = j;
> } else {
> joinedData = joinedData.union(j);
> }
> joined.add(j);
> }
> }
> // Count by DMA
> DataSet<Tuple2<Integer, Integer>> counts = null;
> if (joinedData
> != null) {
> counts = joinedData
> .flatMap(new Mapper(false))
> // group by the tuple field "0" (DMA -- it's been remapped) and sum up tuple field "1"
> .groupBy(0)
> .sum(1);
> }
> // emit result
> if (counts != null) {
> if (params.has("output")) {
> counts.writeAsCsv(params.get("output"), "\n", ", ");
> } else {
> System.out.println("Printing result to stdout. Use --output to specify output path.");
> counts.print();
> }
> }
> // Count by Device
> DataSet<Tuple2<Integer, Integer>> counts2 = null;
> if (joinedData
> != null) {
> counts2 = joinedData
> .flatMap(new Mapper2(true))
> // group by the tuple field "0" (Device -- it's been remapped) and sum up tuple field "1"
> .groupBy(0)
> .sum(1);
> }
> // emit result
> if (counts2 != null) {
> if (params.has("output2")) {
> counts2.writeAsCsv(params.get("output2"), "\n", ", ");
> } else {
> System.out.println("Printing result to stdout. Use --output2 to specify output path.");
> counts2.print();
> }
> }
> // execute program
> env.execute("Union2");
> }
> /**
> * Implements a FlatMapFunction that counts records in a DMA user-defined
> * FlatMapFunction. The function takes a device record and pulls out the
> * DMA, generating ({@code Tuple2<Integer, Integer>}).
> */
> public static final class Mapper implements FlatMapFunction<DeviceRecord1, Tuple2<Integer, Integer>> {
> private boolean match = false;
> Mapper(boolean match) {
> this.match = match;
> }
> Mapper() {
> this(false);
> }
> @Override
> public void flatMap(DeviceRecord1 value, Collector<Tuple2<Integer, Integer>> out) {
> if (value.match == match) {
> out.collect(new Tuple2<>((int) value.dma, 1));
> }
> }
> }
> public static final class Mapper2 implements FlatMapFunction<DeviceRecord1, Tuple2<Integer, Integer>> {
> private boolean match = false;
> Mapper2(boolean match) {
> this.match = match;
> }
> Mapper2() {
> this(false);
> }
> @Override
> public void flatMap(DeviceRecord1 value, Collector<Tuple2<Integer, Integer>> out) {
> if (value.match == match) {
> out.collect(new Tuple2<>((int) value.device, 1));
> }
> }
> }
> public static final class MyFilter
> implements FilterFunction<DeviceRecord> {
> @Override
> public boolean filter(DeviceRecord value) {
> return value.dma != 4;
> }
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)