You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/09/10 01:42:21 UTC
[4/9] git commit: [FLINK-1075] Removed the AsynchronousPartialSorter.
[FLINK-1075] Removed the AsynchronousPartialSorter.
This closes #104
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/64510b6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/64510b6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/64510b6a
Branch: refs/heads/release-0.6.1
Commit: 64510b6a5cb332b3c2b99471b6f6e8608854ef45
Parents: 8af40c3
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Aug 28 18:28:20 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Wed Sep 10 00:43:53 2014 +0200
----------------------------------------------------------------------
.../operators/GroupReduceCombineDriver.java | 125 +++++---
.../runtime/operators/ReduceCombineDriver.java | 1 -
.../sort/AsynchronousPartialSorter.java | 207 -------------
.../AsynchronousPartialSorterCollector.java | 101 ------
.../sort/AsynchonousPartialSorterITCase.java | 306 -------------------
5 files changed, 89 insertions(+), 651 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64510b6a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index f786c56..0452ef7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -23,15 +23,21 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.functions.FlatCombineFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.sort.AsynchronousPartialSorter;
-import org.apache.flink.runtime.operators.util.CloseableInputProvider;
-import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+import org.apache.flink.runtime.operators.sort.QuickSort;
import org.apache.flink.runtime.util.KeyGroupedIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
+import java.io.IOException;
+import java.util.List;
+
/**
* Combine operator, standalone (not chained)
* <p>
@@ -43,16 +49,26 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
private static final Log LOG = LogFactory.getLog(GroupReduceCombineDriver.class);
-
+ /** Fix length records with a length below this threshold will be in-place sorted, if possible. */
+ private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
private PactTaskContext<FlatCombineFunction<T>, T> taskContext;
-
- private CloseableInputProvider<T> input;
- private TypeSerializerFactory<T> serializerFactory;
+ private InMemorySorter<T> sorter;
+
+ private FlatCombineFunction<T> combiner;
+
+ private TypeSerializer<T> serializer;
private TypeComparator<T> comparator;
-
- private volatile boolean running;
+
+ private QuickSort sortAlgo = new QuickSort();
+
+ private MemoryManager memManager;
+
+ private Collector<T> output;
+
+ private volatile boolean running = true;
// ------------------------------------------------------------------------
@@ -81,55 +97,92 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
@Override
public void prepare() throws Exception {
- final TaskConfig config = this.taskContext.getTaskConfig();
- final DriverStrategy ls = config.getDriverStrategy();
+ if(this.taskContext.getTaskConfig().getDriverStrategy() != DriverStrategy.SORTED_GROUP_COMBINE){
+ throw new Exception("Invalid strategy " + this.taskContext.getTaskConfig().getDriverStrategy() + " for " +
+ "group reduce combinder.");
+ }
- final MemoryManager memoryManager = this.taskContext.getMemoryManager();
+ this.memManager = this.taskContext.getMemoryManager();
+ final int numMemoryPages = memManager.computeNumberOfPages(this.taskContext.getTaskConfig().getRelativeMemoryDriver());
- final MutableObjectIterator<T> in = this.taskContext.getInput(0);
- this.serializerFactory = this.taskContext.getInputSerializer(0);
+ final TypeSerializerFactory<T> serializerFactory = this.taskContext.getInputSerializer(0);
+ this.serializer = serializerFactory.getSerializer();
this.comparator = this.taskContext.getInputComparator(0);
-
- switch (ls) {
- case SORTED_GROUP_COMBINE:
- this.input = new AsynchronousPartialSorter<T>(memoryManager, in, this.taskContext.getOwningNepheleTask(),
- this.serializerFactory, this.comparator.duplicate(), config.getRelativeMemoryDriver());
- break;
- // obtain and return a grouped iterator from the combining sort-merger
- default:
- throw new RuntimeException("Invalid local strategy provided for CombineTask.");
+ this.combiner = this.taskContext.getStub();
+ this.output = this.taskContext.getOutputCollector();
+
+ final List<MemorySegment> memory = this.memManager.allocatePages(this.taskContext.getOwningNepheleTask(),
+ numMemoryPages);
+
+ // instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
+ if (this.comparator.supportsSerializationWithKeyNormalization() &&
+ this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
+ {
+ this.sorter = new FixedLengthRecordSorter<T>(this.serializer, this.comparator, memory);
+ } else {
+ this.sorter = new NormalizedKeySorter<T>(this.serializer, this.comparator.duplicate(), memory);
}
}
@Override
public void run() throws Exception {
if (LOG.isDebugEnabled()) {
- LOG.debug(this.taskContext.formatLogString("Preprocessing done, iterator obtained."));
+ LOG.debug("Combiner starting.");
}
- final KeyGroupedIterator<T> iter = new KeyGroupedIterator<T>(this.input.getIterator(),
- this.serializerFactory.getSerializer(), this.comparator);
+ final MutableObjectIterator<T> in = this.taskContext.getInput(0);
+ final TypeSerializer<T> serializer = this.serializer;
+
+ T value = serializer.createInstance();
+
+ while (running && (value = in.next(value)) != null) {
+
+ // try writing to the sorter first
+ if (this.sorter.write(value)) {
+ continue;
+ }
- // cache references on the stack
- final FlatCombineFunction<T> stub = this.taskContext.getStub();
- final Collector<T> output = this.taskContext.getOutputCollector();
+ // do the actual sorting, combining, and data writing
+ sortAndCombine();
+ this.sorter.reset();
- // run stub implementation
- while (this.running && iter.nextKey()) {
- stub.combine(iter.getValues(), output);
+ // write the value again
+ if (!this.sorter.write(value)) {
+ throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
+ }
+ }
+
+ // sort, combine, and send the final batch
+ sortAndCombine();
+ }
+
+ private void sortAndCombine() throws Exception {
+ final InMemorySorter<T> sorter = this.sorter;
+
+ if (!sorter.isEmpty()) {
+ this.sortAlgo.sort(sorter);
+
+ final KeyGroupedIterator<T> keyIter = new KeyGroupedIterator<T>(sorter.getIterator(), this.serializer,
+ this.comparator);
+
+ final FlatCombineFunction<T> combiner = this.combiner;
+ final Collector<T> output = this.output;
+
+ // iterate over key groups
+ while (this.running && keyIter.nextKey()) {
+ combiner.combine(keyIter.getValues(), output);
+ }
}
}
@Override
public void cleanup() throws Exception {
- if (this.input != null) {
- this.input.close();
- this.input = null;
- }
+ this.memManager.release(this.sorter.dispose());
}
@Override
public void cancel() {
this.running = false;
+ this.memManager.release(this.sorter.dispose());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64510b6a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
index 87cea30..6b18bb1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
@@ -109,7 +109,6 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
// instantiate the serializer / comparator
final TypeSerializerFactory<T> serializerFactory = this.taskContext.getInputSerializer(0);
- this.serializer = serializerFactory.getSerializer();
this.comparator = this.taskContext.getInputComparator(0);
this.serializer = serializerFactory.getSerializer();
this.reducer = this.taskContext.getStub();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64510b6a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorter.java
deleted file mode 100644
index 03794ff..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorter.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.sort;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.util.MutableObjectIterator;
-
-
-/**
- * The {@link AsynchronousPartialSorter} is a simple sort implementation that sorts
- * bulks inside its buffers, and returns them directly, without merging them. Therefore,
- * it establishes an order within certain windows, but not across them.
- */
-public class AsynchronousPartialSorter<E> extends UnilateralSortMerger<E> {
-
- private BufferQueueIterator bufferIterator;
-
- // ------------------------------------------------------------------------
- // Constructor
- // ------------------------------------------------------------------------
-
- /**
- *
- *
- * @param memoryManager The memory manager from which to allocate the memory.
- * @param input The input that is sorted by this sorter.
- * @param parentTask The parent task, which owns all resources used by this sorter.
- * @param serializerFactory The type serializer.
- * @param comparator The type comparator establishing the order relation.
- * @param memoryFraction The fraction of memory dedicated to sorting.
- *
- * @throws IOException Thrown, if an error occurs initializing the resources for external sorting.
- * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to
- * perform the sort.
- */
- public AsynchronousPartialSorter(MemoryManager memoryManager,
- MutableObjectIterator<E> input, AbstractInvokable parentTask,
- TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
- double memoryFraction)
- throws IOException, MemoryAllocationException
- {
- super(memoryManager, null, input, parentTask, serializerFactory, comparator, memoryFraction, 1, 2, 0.0f, true);
- }
-
-
- public void close() {
- // make a best effort to close the buffer iterator
- try {
- if (this.bufferIterator != null) {
- this.bufferIterator.close();
- this.bufferIterator = null;
- }
- }
- finally {
- super.close();
- }
- }
-
- /*
- * This method does not actually create a spilling thread, but grabs the circular queues and creates the
- * iterator that reads from the sort buffers in turn.
- */
- @Override
- protected ThreadBase<E> getSpillingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues,
- AbstractInvokable parentTask, MemoryManager memoryManager, IOManager ioManager,
- TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
- List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxFileHandles)
- {
- this.bufferIterator = new BufferQueueIterator(queues);
- setResultIterator(this.bufferIterator);
-
- return null;
- }
-
- // ------------------------------------------------------------------------
-
- private final class BufferQueueIterator implements MutableObjectIterator<E> {
-
- private final CircularQueues<E> queues;
-
- private CircularElement<E> currentElement;
-
- private MutableObjectIterator<E> currentIterator;
-
- private volatile boolean closed = false;
-
-
- protected BufferQueueIterator(CircularQueues<E> queues) {
- this.queues = queues;
- }
-
-
- @Override
- public E next(final E reuse) throws IOException {
- E result;
- if (this.currentIterator != null && ((result = this.currentIterator.next(reuse)) != null)) {
- return result;
- }
- else if (this.closed) {
- throw new IllegalStateException("The sorter has been closed.");
- }
- else {
- if (AsynchronousPartialSorter.this.iteratorException != null) {
- throw new IOException("The sorter has ancountered an error.", AsynchronousPartialSorter.this.iteratorException);
- }
-
- while (true) {
- if (this.currentElement == endMarker()) {
- return null;
- }
- else if (this.currentElement != null) {
- // return the current element to the empty queue
- this.currentElement.buffer.reset();
- this.queues.empty.add(this.currentElement);
- }
-
- // get a new element
- try {
- this.currentElement = null;
- while (!this.closed && this.currentElement == null) {
- this.currentElement = this.queues.spill.poll(1000, TimeUnit.MILLISECONDS);
- }
- if (AsynchronousPartialSorter.this.iteratorException != null) {
- throw new IOException("The sorter has ancountered an error.", AsynchronousPartialSorter.this.iteratorException);
- }
-
- if (this.currentElement == endMarker()) {
- // signals the end, no more buffers will come
- // release the memory first before returning
- releaseSortBuffers();
- return null;
- }
- if (this.currentElement == spillingMarker()) {
- this.currentElement = null;
- continue;
- }
- }
- catch (InterruptedException e) {
- throw new RuntimeException("Iterator was interrupted getting the next sortedBuffer.");
- }
-
- this.currentIterator = this.currentElement.buffer.getIterator();
- if ((result = this.currentIterator.next(reuse)) != null) {
- return result;
- }
- this.currentIterator = null;
- }
- }
- }
-
- public void close() {
- synchronized (this) {
- if (this.closed) {
- return;
- }
- this.closed = true;
- }
-
- if (this.currentElement != null) {
- this.queues.empty.add(this.currentElement);
- this.currentElement = null;
- }
- if (this.currentIterator != null) {
- this.currentIterator = null;
- }
- }
-
- private final void releaseSortBuffers() {
- while (!this.queues.empty.isEmpty()) {
- final CircularElement<E> elem = this.queues.empty.poll();
- if (elem != null) {
- final InMemorySorter<E> sorter = elem.buffer;
- final List<MemorySegment> segments = sorter.dispose();
- AsynchronousPartialSorter.this.memoryManager.release(segments);
- }
- }
- }
-
- };
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64510b6a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorterCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorterCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorterCollector.java
deleted file mode 100644
index a41dbf1..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorterCollector.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.sort;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.util.MutableObjectIterator;
-
-/**
- * The {@link AsynchronousPartialSorterCollector} is a simple sort implementation that sorts
- * bulks inside its buffers, and returns them directly, without merging them. Therefore,
- * it establishes an order within certain windows, but not across them.
- * <p>
- * In contract to the {@link AsynchronousPartialSorter}, this class has no dedicated reading thread that
- * pulls records from an iterator, but offers a collector into which data to be sorted is pushed.
- *
- */
-public class AsynchronousPartialSorterCollector<E> extends AsynchronousPartialSorter<E> {
-
- private InputDataCollector<E> collector;
-
- // ------------------------------------------------------------------------
- // Constructor
- // ------------------------------------------------------------------------
-
- /**
- * @param memoryManager The memory manager from which to allocate the memory.
- * @param parentTask The parent task, which owns all resources used by this sorter.
- * @param serializerFactory The type serializer.
- * @param comparator The type comparator establishing the order relation.
- * @param memoryFraction The fraction of memory dedicated to sorting.
- *
- * @throws IOException Thrown, if an error occurs initializing the resources for external sorting.
- * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to
- * perform the sort.
- */
- public AsynchronousPartialSorterCollector(MemoryManager memoryManager,
- AbstractInvokable parentTask,
- TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
- double memoryFraction)
- throws IOException, MemoryAllocationException
- {
- super(memoryManager, null, parentTask, serializerFactory, comparator,
- memoryFraction);
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Gets the collector that writes into the sort buffers.
- *
- * @return The collector that writes into the sort buffers.
- */
- public InputDataCollector<E> getInputCollector() {
- return this.collector;
- }
-
- @Override
- protected ThreadBase<E> getReadingThread(ExceptionHandler<IOException> exceptionHandler,
- MutableObjectIterator<E> reader, CircularQueues<E> queues, AbstractInvokable parentTask,
- TypeSerializer<E> serializer, long startSpillingBytes)
- {
- this.collector = new InputDataCollector<E>(queues, startSpillingBytes);
- return null;
- }
-
-
- public void close() {
- try {
- if (this.collector != null) {
- this.collector.close();
- }
- }
- finally {
- super.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64510b6a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AsynchonousPartialSorterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AsynchonousPartialSorterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AsynchonousPartialSorterITCase.java
deleted file mode 100644
index 90fe59f..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AsynchonousPartialSorterITCase.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.sort;
-
-import java.io.IOException;
-
-import org.junit.Assert;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
-import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.sort.AsynchronousPartialSorter;
-import org.apache.flink.runtime.operators.sort.ExceptionHandler;
-import org.apache.flink.runtime.operators.sort.Sorter;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Value;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-
-public class AsynchonousPartialSorterITCase {
-
- private static final Log LOG = LogFactory.getLog(AsynchonousPartialSorterITCase.class);
-
- private static final long SEED = 649180756312423613L;
-
- private static final int KEY_MAX = Integer.MAX_VALUE;
-
- private static final Value VAL = new Value("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ");
-
- private static final int VALUE_LENGTH = 114;
-
- public static final int MEMORY_SIZE = 1024 * 1024 * 32;
-
- private final AbstractInvokable parentTask = new DummyInvokable();
-
- private IOManager ioManager;
-
- private MemoryManager memoryManager;
-
- private TypeSerializerFactory<Record> serializer;
-
- private TypeComparator<Record> comparator;
-
-
- @SuppressWarnings("unchecked")
- @Before
- public void beforeTest()
- {
- this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE,1);
- this.ioManager = new IOManager();
- this.serializer = RecordSerializerFactory.get();
- this.comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
- }
-
- @After
- public void afterTest()
- {
- this.ioManager.shutdown();
- if (!this.ioManager.isProperlyShutDown()) {
- Assert.fail("I/O Manager was not properly shut down.");
- }
-
- if (this.memoryManager != null) {
- Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.",
- this.memoryManager.verifyEmpty());
- this.memoryManager.shutdown();
- this.memoryManager = null;
- }
- }
-
- @Test
- public void testSmallSortInOneWindow() throws Exception
- {
- try {
- final int NUM_RECORDS = 1000;
-
- // reader
- final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
- final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_RECORDS);
-
- // merge iterator
- LOG.debug("Initializing sortmerger...");
- Sorter<Record> sorter = new AsynchronousPartialSorter<Record>(this.memoryManager, source,
- this.parentTask, this.serializer, this.comparator, 1.0);
-
- runPartialSorter(sorter, NUM_RECORDS, 0);
- }
- catch (Exception t) {
- t.printStackTrace();
- Assert.fail("Test failed due to an uncaught exception: " + t.getMessage());
- }
- }
-
- @Test
- public void testLargeSortAcrossTwoWindows() throws Exception
- {
- try {
- final int NUM_RECORDS = 100000;
-
- // reader
- final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
- final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_RECORDS);
-
- // merge iterator
- LOG.debug("Initializing sortmerger...");
- Sorter<Record> sorter = new AsynchronousPartialSorter<Record>(this.memoryManager, source,
- this.parentTask, this.serializer, this.comparator, 0.2);
-
- runPartialSorter(sorter, NUM_RECORDS, 2);
- }
- catch (Exception t) {
- t.printStackTrace();
- Assert.fail("Test failed due to an uncaught exception: " + t.getMessage());
- }
- }
-
- @Test
- public void testLargeSortAcrossMultipleWindows() throws Exception
- {
- try {
- final int NUM_RECORDS = 1000000;
-
- // reader
- final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
- final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_RECORDS);
-
- // merge iterator
- LOG.debug("Initializing sortmerger...");
- Sorter<Record> sorter = new AsynchronousPartialSorter<Record>(this.memoryManager, source,
- this.parentTask, this.serializer, this.comparator, 0.15);
-
- runPartialSorter(sorter, NUM_RECORDS, 27);
- }
- catch (Exception t) {
- t.printStackTrace();
- Assert.fail("Test failed due to an uncaught exception: " + t.getMessage());
- }
- }
-
- @Test
- public void testExceptionForwarding() throws IOException
- {
- try {
- Sorter<Record> sorter = null;
- try {
- final int NUM_RECORDS = 100;
-
- // reader
- final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
- final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_RECORDS);
-
- // merge iterator
- LOG.debug("Initializing sortmerger...");
- sorter = new ExceptionThrowingAsynchronousPartialSorter<Record>(this.memoryManager, source,
- this.parentTask, this.serializer, this.comparator, 1.0);
-
- runPartialSorter(sorter, NUM_RECORDS, 0);
-
- Assert.fail("Expected Test Exception not thrown.");
- } catch(Exception e) {
- if (!containsTriggerException(e)) {
- throw e;
- }
- } finally {
- if (sorter != null) {
- sorter.close();
- }
- }
- }
- catch (Exception t) {
- t.printStackTrace();
- Assert.fail("Test failed due to an uncaught exception: " + t.getMessage());
- }
- }
-
- private static void runPartialSorter(Sorter<Record> sorter,
- int expectedNumResultRecords, int expectedNumWindowTransitions)
- throws Exception
- {
- // check order
- final MutableObjectIterator<Record> iterator = sorter.getIterator();
- int pairsEmitted = 1;
- int windowTransitions = 0;
-
- Record rec1 = new Record();
- Record rec2 = new Record();
-
- LOG.debug("Checking results...");
- Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
- while ((rec2 = iterator.next(rec2)) != null)
- {
- final TestData.Key k1 = rec1.getField(0, TestData.Key.class);
- final TestData.Key k2 = rec2.getField(0, TestData.Key.class);
- pairsEmitted++;
-
- // if the next key is smaller again, we have a new window
- if (k1.compareTo(k2) > 0) {
- windowTransitions++;
- }
-
- Record tmp = rec1;
- rec1 = rec2;
- k1.setKey(k2.getKey());
-
- rec2 = tmp;
- }
-
- sorter.close();
-
- Assert.assertEquals("Sorter did not return the expected number of result records.",
- expectedNumResultRecords, pairsEmitted);
- Assert.assertEquals("The partial sorter made an unexpected number of window transitions.",
- expectedNumWindowTransitions, windowTransitions);
- }
-
- private static boolean containsTriggerException(Throwable exception)
- {
- while (exception != null) {
- if (exception.getClass().equals(TriggeredException.class)) {
- return true;
- }
- exception = exception.getCause();
- }
- return false;
- }
-
- // --------------------------------------------------------------------------------------------
- // Internal classes
- // --------------------------------------------------------------------------------------------
-
- /*
- * Mock exception thrown on purpose.
- */
- @SuppressWarnings("serial")
- private static class TriggeredException extends IOException {}
-
- /*
- * Mocked sorter that throws an exception in the sorting thread.
- */
- private static class ExceptionThrowingAsynchronousPartialSorter<E> extends AsynchronousPartialSorter<E>
- {
- protected static class ExceptionThrowingSorterThread<E> extends SortingThread<E> {
-
- public ExceptionThrowingSorterThread(ExceptionHandler<IOException> exceptionHandler,
- org.apache.flink.runtime.operators.sort.UnilateralSortMerger.CircularQueues<E> queues,
- AbstractInvokable parentTask)
- {
- super(exceptionHandler, queues, parentTask);
- }
-
- @Override
- public void go() throws IOException {
- throw new TriggeredException();
- }
- }
-
- public ExceptionThrowingAsynchronousPartialSorter(MemoryManager memoryManager,
- MutableObjectIterator<E> input, AbstractInvokable parentTask,
- TypeSerializerFactory<E> serializer, TypeComparator<E> comparator,
- double memoryFraction)
- throws IOException, MemoryAllocationException
- {
- super(memoryManager, input, parentTask, serializer, comparator, memoryFraction);
- }
-
-
- @Override
- protected ThreadBase<E> getSortingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues,
- AbstractInvokable parentTask)
- {
- return new ExceptionThrowingSorterThread<E>(exceptionHandler, queues, parentTask);
- }
- }
-}