You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "David Dreyfus (JIRA)" <ji...@apache.org> on 2017/10/25 19:31:00 UTC

[jira] [Created] (FLINK-7926) Bug in Hybrid Hash Join: Request to spill a partition with less than two buffers.

David Dreyfus created FLINK-7926:
------------------------------------

             Summary: Bug in Hybrid Hash Join: Request to spill a partition with less than two buffers.
                 Key: FLINK-7926
                 URL: https://issues.apache.org/jira/browse/FLINK-7926
             Project: Flink
          Issue Type: Bug
          Components: Distributed Coordination
    Affects Versions: 1.3.2
         Environment: standalone execution on MacBook Pro
in flink-conf.yaml, taskmanager.numberOfTaskSlots changed from 1 to 3.
taskmanager.heap.mb = 1024
taskmanager.memory.preallocate = false
taskmanager.numberOfTaskSlots = 3
            Reporter: David Dreyfus


The following exception is thrown as the number of tasks increases.
{code:java}
10/25/2017 14:26:16     LeftOuterJoin(Join at with(JoinOperatorSetsBase.java:232))(1/1) switched to FAILED
java.lang.RuntimeException: Bug in Hybrid Hash Join: Request to spill a partition with less than two buffers.
        at org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:302)
        at org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1231)
        at org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:1053)
        at org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:978)
        at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:938)
        at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631)
        at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
        at org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator.callWithNextKey(NonReusingBuildSecondHashJoinIterator.java:114)
        at org.apache.flink.runtime.operators.AbstractOuterJoinDriver.run(AbstractOuterJoinDriver.java:160)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)

10/25/2017 14:26:16     Job execution switched to status FAILING.
java.lang.RuntimeException: Bug in Hybrid Hash Join: Request to spill a partition with less than two buffers.
        at org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:302)
        at org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1231)
        at org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:1053)
        at org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:978)
        at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:938)
        at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631)
        at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
        at org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator.callWithNextKey(NonReusingBuildSecondHashJoinIterator.java:114)
        at org.apache.flink.runtime.operators.AbstractOuterJoinDriver.run(AbstractOuterJoinDriver.java:160)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)
{code}

I run with the following command:

{code:java}
flink run -c com.northbay.union.Union3 ./FlinkWordCount/target/FlinkWordCount-1.0-SNAPSHOT.jar --left /Users/user/Documents/Flink/Quickstart/Files/manysmall --right /Users/user/Documents/Flink/Quickstart/Files/manysmall --output /tmp/test6d_nomatch --output2 /tmp/test6d --filecount 50
{code}

The files submitted are all CSV (int, string, short)

This is the code (break out into 3 separate files before using). The idea behind this test is to compare (hash-join) pairs of files and combine their results.

{code:java}
package com.northbay.hashcount;

public class DeviceRecord1 {
    public int device;
    public String fingerprint;
    public short dma;
    public boolean match;
    public DeviceRecord1() {
        
    }
    public DeviceRecord1(DeviceRecord old) {
        this.device = old.device;
        this.fingerprint = old.fingerprint;
        this.dma = old.dma;
        this.match = false;
    }
    public DeviceRecord1(int device, String fingerprint, short dma) {
        this.device = device;
        this.fingerprint = fingerprint;
        this.dma = dma;
        this.match = false;
    }
}
package com.northbay.hashcount;

public class DeviceRecord {
    public int device;
    public String fingerprint;
    public short dma;
    public DeviceRecord() {
        
    }
    public DeviceRecord(DeviceRecord old) {
        this.device = old.device;
        this.fingerprint = old.fingerprint;
        this.dma = old.dma;
    }
    public DeviceRecord(int device, String fingerprint, short dma) {
        this.device = device;
        this.fingerprint = fingerprint;
        this.dma = dma;
    }
}






package com.northbay.union;

import com.northbay.hashcount.DeviceRecord;
import com.northbay.hashcount.DeviceRecord1;
import java.util.LinkedList;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;

@SuppressWarnings("serial")
public class Union2 {

    // *************************************************************************
    //     PROGRAM
    // *************************************************************************
    public static void main(String[] args) throws Exception {

        final ParameterTool params = ParameterTool.fromArgs(args);

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        LinkedList<DataSet<DeviceRecord1>> joined = new LinkedList<>();
        DataSet<DeviceRecord1> joinedData = null;
        if (params.has("left") && params.has("right")) {
            for (int i = 0; i < Integer.decode(params.get("filecount")); i++) {
                DataSet<DeviceRecord> l, r;
                DataSet<DeviceRecord1> j;
                // read the text file from given input path
                l = env.readCsvFile(params.get("left") + "/" + Integer.toString(i))
                    .pojoType(DeviceRecord.class, "device", "fingerprint", "dma");
                // read the text file from given input path
                r = env.readCsvFile(params.get("right") + "/" + Integer.toString(i))
                    .pojoType(DeviceRecord.class, "device", "fingerprint", "dma")
                    .filter(new MyFilter());
                j = l.leftOuterJoin(r)
                    .where("fingerprint")
                    .equalTo("fingerprint")
                    .with(new FlatJoinFunction<DeviceRecord, DeviceRecord, DeviceRecord1>() {
                        @Override
                        public void join(DeviceRecord left, DeviceRecord right, Collector<DeviceRecord1> out) throws Exception {
                            if (right == null) {
                                out.collect(new DeviceRecord1(left));
                            } else {
                                DeviceRecord1 d = new DeviceRecord1(left);
                                d.match = true;
                                out.collect(d);
                            }

                        }
                    });
                
                if (joinedData == null) {
                    joinedData = j;
                } else {
                    joinedData = joinedData.union(j);
                }
                joined.add(j);
            }
        }
        // Count by DMA
        DataSet<Tuple2<Integer, Integer>> counts = null;
        if (joinedData
            != null) {
            counts = joinedData
                .flatMap(new Mapper(false))
                // group by the tuple field "0" (DMA -- it's been remapped) and sum up tuple field "1"
                .groupBy(0)
                .sum(1);
        }

        // emit result
        if (counts != null) {
            if (params.has("output")) {
                counts.writeAsCsv(params.get("output"), "\n", ", ");
            } else {
                System.out.println("Printing result to stdout. Use --output to specify output path.");
                counts.print();

            }
        }
        // Count by Device
        DataSet<Tuple2<Integer, Integer>> counts2 = null;
        if (joinedData
            != null) {
            counts2 = joinedData
                .flatMap(new Mapper2(true))
                // group by the tuple field "0" (Device -- it's been remapped) and sum up tuple field "1"
                .groupBy(0)
                .sum(1);
        }

        // emit result
        if (counts2 != null) {
            if (params.has("output2")) {
                counts2.writeAsCsv(params.get("output2"), "\n", ", ");
            } else {
                System.out.println("Printing result to stdout. Use --output2 to specify output path.");
                counts2.print();

            }
        }
        // execute program

        env.execute("Union2");
    }

    /**
     * Implements a FlatMapFunction that counts records in a DMA user-defined
     * FlatMapFunction. The function takes a device record and pulls out the
     * DMA, generating ({@code Tuple2<Integer, Integer>}).
     */
    public static final class Mapper implements FlatMapFunction<DeviceRecord1, Tuple2<Integer, Integer>> {

        private boolean match = false;

        Mapper(boolean match) {
            this.match = match;
        }

        Mapper() {
            this(false);
        }

        @Override
        public void flatMap(DeviceRecord1 value, Collector<Tuple2<Integer, Integer>> out) {
            if (value.match == match) {
                out.collect(new Tuple2<>((int) value.dma, 1));
            }
        }
    }

    public static final class Mapper2 implements FlatMapFunction<DeviceRecord1, Tuple2<Integer, Integer>> {

        private boolean match = false;

        Mapper2(boolean match) {
            this.match = match;
        }

        Mapper2() {
            this(false);
        }

        @Override
        public void flatMap(DeviceRecord1 value, Collector<Tuple2<Integer, Integer>> out) {
            if (value.match == match) {
                out.collect(new Tuple2<>((int) value.device, 1));
            }
        }
    }

    public static final class MyFilter
        implements FilterFunction<DeviceRecord> {

        @Override
        public boolean filter(DeviceRecord value) {
            return value.dma != 4;
        }
    }
}
{code}





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)