You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by fhueske <gi...@git.apache.org> on 2015/09/03 16:51:32 UTC

[GitHub] flink pull request: [FLINK-2106] [runtime] add Left-, Right- and F...

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1052#discussion_r38654270
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/AbstractOuterJoinTaskTest.java ---
    @@ -0,0 +1,452 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +package org.apache.flink.runtime.operators;
    +
    +import com.google.common.base.Throwables;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.functions.FlatJoinFunction;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.IntComparator;
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
    +import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
    +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
    +import org.apache.flink.runtime.operators.testutils.BinaryOperatorTestBase;
    +import org.apache.flink.runtime.operators.testutils.DelayingIterator;
    +import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
    +import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
    +import org.apache.flink.runtime.operators.testutils.InfiniteIntTupleIterator;
    +import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
    +import org.apache.flink.util.Collector;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.fail;
    +
    +public abstract class AbstractOuterJoinTaskTest extends BinaryOperatorTestBase<FlatJoinFunction<Tuple2<Integer, Integer>,
    +		Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
    +	
    +	private static final long HASH_MEM = 6 * 1024 * 1024;
    +	
    +	private static final long SORT_MEM = 3 * 1024 * 1024;
    +	
    +	private static final int NUM_SORTER = 2;
    +	
    +	private static final long BNLJN_MEM = 10 * PAGE_SIZE;
    +	
    +	private final double bnljn_frac;
    +	
    +	private final DriverStrategy driverStrategy;
    +	
    +	@SuppressWarnings("unchecked")
    +	private final TypeComparator<Tuple2<Integer, Integer>> comparator1 = new TupleComparator<>(
    +			new int[]{0},
    +			new TypeComparator<?>[]{new IntComparator(true)},
    +			new TypeSerializer<?>[]{IntSerializer.INSTANCE}
    +	);
    +	
    +	@SuppressWarnings("unchecked")
    +	private final TypeComparator<Tuple2<Integer, Integer>> comparator2 = new TupleComparator<>(
    +			new int[]{0},
    +			new TypeComparator<?>[]{new IntComparator(true)},
    +			new TypeSerializer<?>[]{IntSerializer.INSTANCE}
    +	);
    +	
    +	private final List<Tuple2<Integer, Integer>> outList = new ArrayList<>();
    +	
    +	private final TypeSerializer<Tuple2<Integer, Integer>> serializer = new TupleSerializer<>(
    +			(Class<Tuple2<Integer, Integer>>) (Class<?>) Tuple2.class,
    +			new TypeSerializer<?>[]{IntSerializer.INSTANCE, IntSerializer.INSTANCE}
    +	);
    +	
    +	
    +	public AbstractOuterJoinTaskTest(ExecutionConfig config, DriverStrategy driverStrategy) {
    +		super(config, HASH_MEM, NUM_SORTER, SORT_MEM);
    +		bnljn_frac = (double) BNLJN_MEM / this.getMemoryManager().getMemorySize();
    +		this.driverStrategy = driverStrategy;
    +	}
    +	
    +	@Test
    +	public void testSortBoth1OuterJoinTask() throws Exception {
    +		final int keyCnt1 = 20;
    +		final int valCnt1 = 1;
    +		
    +		final int keyCnt2 = 10;
    +		final int valCnt2 = 2;
    +		
    +		testSortBothOuterJoinTask(keyCnt1, valCnt1, keyCnt2, valCnt2);
    +	}
    +	
    +	@Test
    +	public void testSortBoth2OuterJoinTask() throws Exception {
    +		final int keyCnt1 = 20;
    +		final int valCnt1 = 1;
    +		
    +		final int keyCnt2 = 20;
    +		final int valCnt2 = 1;
    +		
    +		testSortBothOuterJoinTask(keyCnt1, valCnt1, keyCnt2, valCnt2);
    +	}
    +	
    +	@Test
    +	public void testSortBoth3OuterJoinTask() throws Exception {
    +		int keyCnt1 = 20;
    +		int valCnt1 = 1;
    +		
    +		int keyCnt2 = 20;
    +		int valCnt2 = 20;
    +		
    +		testSortBothOuterJoinTask(keyCnt1, valCnt1, keyCnt2, valCnt2);
    +	}
    +	
    +	@Test
    +	public void testSortBoth4OuterJoinTask() throws Exception {
    +		int keyCnt1 = 20;
    +		int valCnt1 = 20;
    +		
    +		int keyCnt2 = 20;
    +		int valCnt2 = 1;
    +		
    +		testSortBothOuterJoinTask(keyCnt1, valCnt1, keyCnt2, valCnt2);
    +	}
    +	
    +	@Test
    +	public void testSortBoth5OuterJoinTask() throws Exception {
    +		int keyCnt1 = 20;
    +		int valCnt1 = 20;
    +		
    +		int keyCnt2 = 20;
    +		int valCnt2 = 20;
    +		
    +		testSortBothOuterJoinTask(keyCnt1, valCnt1, keyCnt2, valCnt2);
    +	}
    +	
    +	@Test
    +	public void testSortBoth6OuterJoinTask() throws Exception {
    +		int keyCnt1 = 10;
    +		int valCnt1 = 1;
    +		
    +		int keyCnt2 = 20;
    +		int valCnt2 = 2;
    +		
    +		testSortBothOuterJoinTask(keyCnt1, valCnt1, keyCnt2, valCnt2);
    +	}
    +	
    +	private void testSortBothOuterJoinTask(int keyCnt1, int valCnt1, int keyCnt2, int valCnt2) throws Exception {
    +		setOutput(this.outList, this.serializer);
    +		addDriverComparator(this.comparator1);
    +		addDriverComparator(this.comparator2);
    +		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
    +		getTaskConfig().setDriverStrategy(this.driverStrategy);
    +		getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
    +		setNumFileHandlesForSort(4);
    +		
    +		final AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask = getOuterJoinDriver();
    +		
    +		addInputSorted(new UniformIntTupleGenerator(keyCnt1, valCnt1, false), this.serializer, this.comparator1.duplicate());
    +		addInputSorted(new UniformIntTupleGenerator(keyCnt2, valCnt2, false), this.serializer, this.comparator2.duplicate());
    +		testDriver(testTask, MockJoinStub.class);
    +		
    +		final int expCnt = calculateExpectedCount(keyCnt1, valCnt1, keyCnt2, valCnt2);
    +		
    +		Assert.assertTrue("Result set size was " + this.outList.size() + ". Expected was " + expCnt, this.outList.size() == expCnt);
    +		
    +		this.outList.clear();
    +	}
    +	
    +	@Test
    +	public void testSortFirstOuterJoinTask() throws Exception {
    +		int keyCnt1 = 20;
    +		int valCnt1 = 20;
    +		
    +		int keyCnt2 = 20;
    +		int valCnt2 = 20;
    +		
    +		setOutput(this.outList, this.serializer);
    +		addDriverComparator(this.comparator1);
    +		addDriverComparator(this.comparator2);
    +		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
    +		getTaskConfig().setDriverStrategy(this.driverStrategy);
    +		getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
    +		setNumFileHandlesForSort(4);
    +		
    +		final AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask = getOuterJoinDriver();
    +		
    +		addInputSorted(new UniformIntTupleGenerator(keyCnt1, valCnt1, false), this.serializer, this.comparator1.duplicate());
    +		addInput(new UniformIntTupleGenerator(keyCnt2, valCnt2, true), this.serializer);
    +		testDriver(testTask, MockJoinStub.class);
    +		
    +		final int expCnt = calculateExpectedCount(keyCnt1, valCnt1, keyCnt2, valCnt2);
    +		
    +		Assert.assertTrue("Result set size was " + this.outList.size() + ". Expected was " + expCnt, this.outList.size() == expCnt);
    +		
    +		this.outList.clear();
    +	}
    +	
    +	@Test
    +	public void testSortSecondOuterJoinTask() throws Exception {
    +		int keyCnt1 = 20;
    +		int valCnt1 = 20;
    +		
    +		int keyCnt2 = 20;
    +		int valCnt2 = 20;
    +		
    +		setOutput(this.outList, this.serializer);
    +		addDriverComparator(this.comparator1);
    +		addDriverComparator(this.comparator2);
    +		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
    +		getTaskConfig().setDriverStrategy(this.driverStrategy);
    +		getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
    +		setNumFileHandlesForSort(4);
    +		
    +		final AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask = getOuterJoinDriver();
    +		
    +		addInput(new UniformIntTupleGenerator(keyCnt1, valCnt1, true), this.serializer);
    +		addInputSorted(new UniformIntTupleGenerator(keyCnt2, valCnt2, false), this.serializer, this.comparator2.duplicate());
    +		testDriver(testTask, MockJoinStub.class);
    +		
    +		final int expCnt = calculateExpectedCount(keyCnt1, valCnt1, keyCnt2, valCnt2);
    +		
    +		Assert.assertTrue("Result set size was " + this.outList.size() + ". Expected was " + expCnt, this.outList.size() == expCnt);
    +		
    +		this.outList.clear();
    +	}
    +	
    +	@Test
    +	public void testMergeOuterJoinTask() throws Exception {
    +		int keyCnt1 = 20;
    +		int valCnt1 = 20;
    +		
    +		int keyCnt2 = 20;
    +		int valCnt2 = 20;
    +		
    +		setOutput(this.outList, this.serializer);
    +		addDriverComparator(this.comparator1);
    +		addDriverComparator(this.comparator2);
    +		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
    +		getTaskConfig().setDriverStrategy(this.driverStrategy);
    +		getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
    +		setNumFileHandlesForSort(4);
    +		
    +		final AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask = getOuterJoinDriver();
    +		
    +		addInput(new UniformIntTupleGenerator(keyCnt1, valCnt1, true), this.serializer);
    +		addInput(new UniformIntTupleGenerator(keyCnt2, valCnt2, true), this.serializer);
    +		
    +		testDriver(testTask, MockJoinStub.class);
    +		
    +		final int expCnt = calculateExpectedCount(keyCnt1, valCnt1, keyCnt2, valCnt2);
    +		
    +		Assert.assertTrue("Result set size was " + this.outList.size() + ". Expected was " + expCnt, this.outList.size() == expCnt);
    +		
    +		this.outList.clear();
    +	}
    +	
    +	@Test(expected = ExpectedTestException.class)
    +	public void testFailingOuterJoinTask() throws Exception {
    +		int keyCnt1 = 20;
    +		int valCnt1 = 20;
    +		
    +		int keyCnt2 = 20;
    +		int valCnt2 = 20;
    +		
    +		setOutput(new DiscardingOutputCollector<Tuple2<Integer, Integer>>());
    +		addDriverComparator(this.comparator1);
    +		addDriverComparator(this.comparator2);
    +		getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
    +		getTaskConfig().setDriverStrategy(this.driverStrategy);
    +		getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
    +		setNumFileHandlesForSort(4);
    +		
    +		final AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask = getOuterJoinDriver();
    +		
    +		addInput(new UniformIntTupleGenerator(keyCnt1, valCnt1, true), this.serializer);
    +		addInput(new UniformIntTupleGenerator(keyCnt2, valCnt2, true), this.serializer);
    +		
    +		testDriver(testTask, MockFailingJoinStub.class);
    +		Assert.fail("Driver did not forward Exception.");
    --- End diff --
    
    You can remove this assertion. 
    The test will fail if the test method does not throw an `ExpectedTestException`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---