You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/09/08 16:58:51 UTC

[2/3] flink git commit: [FLINK-2106] [runtime] Add Outer Join drivers and Outer Merge strategies to Runtime

http://git-wip-us.apache.org/repos/asf/flink/blob/f3dee23b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
new file mode 100644
index 0000000..a085eeb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.PactDriver;
+import org.apache.flink.runtime.operators.PactTaskContext;
+import org.apache.flink.runtime.operators.ResettablePactDriver;
+import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogger implements PactTaskContext<S, OUT> {
+	
+	protected static final int PAGE_SIZE = 32 * 1024;
+	
+	private final IOManager ioManager;
+	
+	private final MemoryManager memManager;
+	
+	private final List<MutableObjectIterator<IN>> inputs;
+	
+	private final List<TypeComparator<IN>> comparators;
+	
+	private final List<UnilateralSortMerger<IN>> sorters;
+	
+	private final AbstractInvokable owner;
+	
+	private final TaskConfig taskConfig;
+	
+	private final TaskManagerRuntimeInfo taskManageInfo;
+	
+	protected final long perSortMem;
+	
+	protected final double perSortFractionMem;
+	
+	private Collector<OUT> output;
+	
+	protected int numFileHandles;
+	
+	private S stub;
+	
+	private PactDriver<S, IN> driver;
+	
+	private volatile boolean running = true;
+	
+	private ExecutionConfig executionConfig;
+	
+	private List<TypeSerializer<IN>> inputSerializers = new ArrayList<>();
+	
+	protected BinaryOperatorTestBase(ExecutionConfig executionConfig, long memory, int maxNumSorters, long perSortMemory) {
+		if (memory < 0 || maxNumSorters < 0 || perSortMemory < 0) {
+			throw new IllegalArgumentException();
+		}
+		
+		final long totalMem = Math.max(memory, 0) + (Math.max(maxNumSorters, 0) * perSortMemory);
+		
+		this.perSortMem = perSortMemory;
+		this.perSortFractionMem = (double) perSortMemory / totalMem;
+		this.ioManager = new IOManagerAsync();
+		this.memManager = totalMem > 0 ? new DefaultMemoryManager(totalMem, 1) : null;
+		
+		this.inputs = new ArrayList<>();
+		this.comparators = new ArrayList<>();
+		this.sorters = new ArrayList<>();
+		
+		this.owner = new DummyInvokable();
+		this.taskConfig = new TaskConfig(new Configuration());
+		this.executionConfig = executionConfig;
+		this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", new Configuration());
+	}
+	
+	@Parameterized.Parameters
+	public static Collection<Object[]> getConfigurations() throws IOException {
+		LinkedList<Object[]> configs = new LinkedList<>();
+		
+		ExecutionConfig withReuse = new ExecutionConfig();
+		withReuse.enableObjectReuse();
+		
+		ExecutionConfig withoutReuse = new ExecutionConfig();
+		withoutReuse.disableObjectReuse();
+		
+		Object[] a = {withoutReuse};
+		configs.add(a);
+		Object[] b = {withReuse};
+		configs.add(b);
+		
+		return configs;
+	}
+	
+	public void addInput(MutableObjectIterator<IN> input, TypeSerializer<IN> serializer) {
+		this.inputs.add(input);
+		this.sorters.add(null);
+		this.inputSerializers.add(serializer);
+	}
+	
+	@SuppressWarnings("unchecked")
+	public void addInputSorted(MutableObjectIterator<IN> input, TypeSerializer<IN> serializer, TypeComparator<IN> comp) throws Exception {
+		this.inputSerializers.add(serializer);
+		UnilateralSortMerger<IN> sorter = new UnilateralSortMerger<>(
+				this.memManager,
+				this.ioManager,
+				input,
+				this.owner,
+				new RuntimeSerializerFactory<>(serializer, (Class<IN>) serializer.createInstance().getClass()),
+				comp,
+				this.perSortFractionMem,
+				32,
+				0.8f
+		);
+		this.sorters.add(sorter);
+		this.inputs.add(null);
+	}
+	
+	public void addDriverComparator(TypeComparator<IN> comparator) {
+		this.comparators.add(comparator);
+	}
+	
+	public void setOutput(Collector<OUT> output) {
+		this.output = output;
+	}
+	
+	public void setOutput(List<OUT> output, TypeSerializer<OUT> outSerializer) {
+		this.output = new ListOutputCollector<>(output, outSerializer);
+	}
+	
+	public int getNumFileHandlesForSort() {
+		return numFileHandles;
+	}
+	
+	
+	public void setNumFileHandlesForSort(int numFileHandles) {
+		this.numFileHandles = numFileHandles;
+	}
+	
+	@SuppressWarnings("rawtypes")
+	public void testDriver(PactDriver driver, Class stubClass) throws Exception {
+		testDriverInternal(driver, stubClass);
+	}
+	
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	public void testDriverInternal(PactDriver driver, Class stubClass) throws Exception {
+		
+		this.driver = driver;
+		driver.setup(this);
+		
+		this.stub = (S) stubClass.newInstance();
+		
+		// regular running logic
+		this.running = true;
+		boolean stubOpen = false;
+		
+		try {
+			// run the data preparation
+			try {
+				driver.prepare();
+			} catch (Throwable t) {
+				throw new Exception("The data preparation caused an error: " + t.getMessage(), t);
+			}
+			
+			// open stub implementation
+			try {
+				FunctionUtils.openFunction(this.stub, getTaskConfig().getStubParameters());
+				stubOpen = true;
+			} catch (Throwable t) {
+				throw new Exception("The user defined 'open()' method caused an exception: " + t.getMessage(), t);
+			}
+			
+			if (!running) {
+				return;
+			}
+			
+			// run the user code
+			driver.run();
+			
+			// close. We close here such that a regular close throwing an exception marks a task as failed.
+			if (this.running) {
+				FunctionUtils.closeFunction(this.stub);
+				stubOpen = false;
+			}
+			
+			this.output.close();
+		} catch (Exception ex) {
+			// close the input, but do not report any exceptions, since we already have another root cause
+			if (stubOpen) {
+				try {
+					FunctionUtils.closeFunction(this.stub);
+				} catch (Throwable ignored) {
+				}
+			}
+			
+			// if resettable driver invoke tear down
+			if (this.driver instanceof ResettablePactDriver) {
+				final ResettablePactDriver<?, ?> resDriver = (ResettablePactDriver<?, ?>) this.driver;
+				try {
+					resDriver.teardown();
+				} catch (Throwable t) {
+					throw new Exception("Error while shutting down an iterative operator: " + t.getMessage(), t);
+				}
+			}
+			
+			// drop exception, if the task was canceled
+			if (this.running) {
+				throw ex;
+			}
+			
+		} finally {
+			driver.cleanup();
+		}
+	}
+	
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	public void testResettableDriver(ResettablePactDriver driver, Class stubClass, int iterations) throws Exception {
+		driver.setup(this);
+		
+		for (int i = 0; i < iterations; i++) {
+			
+			if (i == 0) {
+				driver.initialize();
+			} else {
+				driver.reset();
+			}
+			
+			testDriver(driver, stubClass);
+			
+		}
+		
+		driver.teardown();
+	}
+	
+	public void cancel() throws Exception {
+		this.running = false;
+		
+		// compensate for races, where cancel is called before the driver is set
+		// not that this is an artifact of a bad design of this test base, where the setup
+		// of the basic properties is not separated from the invocation of the execution logic 
+		while (this.driver == null) {
+			Thread.sleep(200);
+		}
+		this.driver.cancel();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public TaskConfig getTaskConfig() {
+		return this.taskConfig;
+	}
+	
+	@Override
+	public TaskManagerRuntimeInfo getTaskManagerInfo() {
+		return this.taskManageInfo;
+	}
+	
+	@Override
+	public ExecutionConfig getExecutionConfig() {
+		return executionConfig;
+	}
+	
+	@Override
+	public ClassLoader getUserCodeClassLoader() {
+		return getClass().getClassLoader();
+	}
+	
+	@Override
+	public IOManager getIOManager() {
+		return this.ioManager;
+	}
+	
+	@Override
+	public MemoryManager getMemoryManager() {
+		return this.memManager;
+	}
+	
+	@Override
+	public <X> MutableObjectIterator<X> getInput(int index) {
+		MutableObjectIterator<IN> in = this.inputs.get(index);
+		if (in == null) {
+			// waiting from sorter
+			try {
+				in = this.sorters.get(index).getIterator();
+			} catch (InterruptedException e) {
+				throw new RuntimeException("Interrupted");
+			}
+			this.inputs.set(index, in);
+		}
+		
+		@SuppressWarnings("unchecked")
+		MutableObjectIterator<X> input = (MutableObjectIterator<X>) this.inputs.get(index);
+		return input;
+	}
+	
+	@Override
+	@SuppressWarnings("unchecked")
+	public <X> TypeSerializerFactory<X> getInputSerializer(int index) {
+		TypeSerializer<X> ser = (TypeSerializer<X>) this.inputSerializers.get(index);
+		return new RuntimeSerializerFactory<>(ser, (Class<X>) ser.createInstance().getClass());
+	}
+	
+	@Override
+	public <X> TypeComparator<X> getDriverComparator(int index) {
+		@SuppressWarnings("unchecked")
+		TypeComparator<X> comparator = (TypeComparator<X>) this.comparators.get(index);
+		return comparator;
+	}
+	
+	@Override
+	public S getStub() {
+		return this.stub;
+	}
+	
+	@Override
+	public Collector<OUT> getOutputCollector() {
+		return this.output;
+	}
+	
+	@Override
+	public AbstractInvokable getOwningNepheleTask() {
+		return this.owner;
+	}
+	
+	@Override
+	public String formatLogString(String message) {
+		return "Driver Tester: " + message;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@After
+	public void shutdownAll() throws Exception {
+		// 1st, shutdown sorters
+		for (UnilateralSortMerger<?> sorter : this.sorters) {
+			if (sorter != null) {
+				sorter.close();
+			}
+		}
+		this.sorters.clear();
+		
+		// 2nd, shutdown I/O
+		this.ioManager.shutdown();
+		Assert.assertTrue("I/O Manager has not properly shut down.", this.ioManager.isProperlyShutDown());
+		
+		// last, verify all memory is returned and shutdown mem manager
+		MemoryManager memMan = getMemoryManager();
+		if (memMan != null) {
+			Assert.assertTrue("Memory Manager managed memory was not completely freed.", memMan.verifyEmpty());
+			memMan.shutdown();
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private static final class ListOutputCollector<OUT> implements Collector<OUT> {
+		
+		private final List<OUT> output;
+		private final TypeSerializer<OUT> serializer;
+		
+		public ListOutputCollector(List<OUT> outputList, TypeSerializer<OUT> serializer) {
+			this.output = outputList;
+			this.serializer = serializer;
+		}
+		
+		
+		@Override
+		public void collect(OUT record) {
+			this.output.add(serializer.copy(record));
+		}
+		
+		@Override
+		public void close() {
+		}
+	}
+	
+	public static final class CountingOutputCollector<OUT> implements Collector<OUT> {
+		
+		private int num;
+		
+		@Override
+		public void collect(OUT record) {
+			this.num++;
+		}
+		
+		@Override
+		public void close() {
+		}
+		
+		public int getNumberOfRecords() {
+			return this.num;
+		}
+	}
+}