You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/01 19:52:47 UTC

[1/4] git commit: [FLINK-1075] Removed the AsynchronousPartialSorter.

Repository: incubator-flink
Updated Branches:
  refs/heads/master 0b100517f -> c9dd60385


[FLINK-1075] Removed the AsynchronousPartialSorter.

This closes #104


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/cbbcf782
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/cbbcf782
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/cbbcf782

Branch: refs/heads/master
Commit: cbbcf7820885a8a9734ffeba637b0182a6637939
Parents: 0b10051
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Aug 28 18:28:20 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 1 19:15:14 2014 +0200

----------------------------------------------------------------------
 .../operators/GroupReduceCombineDriver.java     | 125 +++++---
 .../runtime/operators/ReduceCombineDriver.java  |   1 -
 .../sort/AsynchronousPartialSorter.java         | 207 -------------
 .../AsynchronousPartialSorterCollector.java     | 101 ------
 .../sort/AsynchonousPartialSorterITCase.java    | 306 -------------------
 5 files changed, 89 insertions(+), 651 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cbbcf782/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index f786c56..0452ef7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -23,15 +23,21 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.sort.AsynchronousPartialSorter;
-import org.apache.flink.runtime.operators.util.CloseableInputProvider;
-import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+import org.apache.flink.runtime.operators.sort.QuickSort;
 import org.apache.flink.runtime.util.KeyGroupedIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
+import java.io.IOException;
+import java.util.List;
+
 /**
  * Combine operator, standalone (not chained)
  * <p>
@@ -43,16 +49,26 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
 	
 	private static final Log LOG = LogFactory.getLog(GroupReduceCombineDriver.class);
 
-	
+	/** Fix length records with a length below this threshold will be in-place sorted, if possible. */
+	private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
 	private PactTaskContext<FlatCombineFunction<T>, T> taskContext;
-	
-	private CloseableInputProvider<T> input;
 
-	private TypeSerializerFactory<T> serializerFactory;
+	private InMemorySorter<T> sorter;
+
+	private FlatCombineFunction<T> combiner;
+
+	private TypeSerializer<T> serializer;
 
 	private TypeComparator<T> comparator;
-	
-	private volatile boolean running;
+
+	private QuickSort sortAlgo = new QuickSort();
+
+	private MemoryManager memManager;
+
+	private Collector<T> output;
+
+	private volatile boolean running = true;
 
 	// ------------------------------------------------------------------------
 
@@ -81,55 +97,92 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
 
 	@Override
 	public void prepare() throws Exception {
-		final TaskConfig config = this.taskContext.getTaskConfig();
-		final DriverStrategy ls = config.getDriverStrategy();
+		if(this.taskContext.getTaskConfig().getDriverStrategy() != DriverStrategy.SORTED_GROUP_COMBINE){
+			throw new Exception("Invalid strategy " + this.taskContext.getTaskConfig().getDriverStrategy() + " for " +
+					"group reduce combinder.");
+		}
 
-		final MemoryManager memoryManager = this.taskContext.getMemoryManager();
+		this.memManager = this.taskContext.getMemoryManager();
+		final int numMemoryPages = memManager.computeNumberOfPages(this.taskContext.getTaskConfig().getRelativeMemoryDriver());
 
-		final MutableObjectIterator<T> in = this.taskContext.getInput(0);
-		this.serializerFactory = this.taskContext.getInputSerializer(0);
+		final TypeSerializerFactory<T> serializerFactory = this.taskContext.getInputSerializer(0);
+		this.serializer = serializerFactory.getSerializer();
 		this.comparator = this.taskContext.getInputComparator(0);
-
-		switch (ls) {
-		case SORTED_GROUP_COMBINE:
-			this.input = new AsynchronousPartialSorter<T>(memoryManager, in, this.taskContext.getOwningNepheleTask(),
-						this.serializerFactory, this.comparator.duplicate(), config.getRelativeMemoryDriver());
-			break;
-		// obtain and return a grouped iterator from the combining sort-merger
-		default:
-			throw new RuntimeException("Invalid local strategy provided for CombineTask.");
+		this.combiner = this.taskContext.getStub();
+		this.output = this.taskContext.getOutputCollector();
+
+		final List<MemorySegment> memory = this.memManager.allocatePages(this.taskContext.getOwningNepheleTask(),
+				numMemoryPages);
+
+		// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
+		if (this.comparator.supportsSerializationWithKeyNormalization() &&
+				this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
+		{
+			this.sorter = new FixedLengthRecordSorter<T>(this.serializer, this.comparator, memory);
+		} else {
+			this.sorter = new NormalizedKeySorter<T>(this.serializer, this.comparator.duplicate(), memory);
 		}
 	}
 
 	@Override
 	public void run() throws Exception {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug(this.taskContext.formatLogString("Preprocessing done, iterator obtained."));
+			LOG.debug("Combiner starting.");
 		}
 
-		final KeyGroupedIterator<T> iter = new KeyGroupedIterator<T>(this.input.getIterator(),
-				this.serializerFactory.getSerializer(), this.comparator);
+		final MutableObjectIterator<T> in = this.taskContext.getInput(0);
+		final TypeSerializer<T> serializer = this.serializer;
+
+		T value = serializer.createInstance();
+
+		while (running && (value = in.next(value)) != null) {
+
+			// try writing to the sorter first
+			if (this.sorter.write(value)) {
+				continue;
+			}
 
-		// cache references on the stack
-		final FlatCombineFunction<T> stub = this.taskContext.getStub();
-		final Collector<T> output = this.taskContext.getOutputCollector();
+			// do the actual sorting, combining, and data writing
+			sortAndCombine();
+			this.sorter.reset();
 
-		// run stub implementation
-		while (this.running && iter.nextKey()) {
-			stub.combine(iter.getValues(), output);
+			// write the value again
+			if (!this.sorter.write(value)) {
+				throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
+			}
+		}
+
+		// sort, combine, and send the final batch
+		sortAndCombine();
+	}
+
+	private void sortAndCombine() throws Exception {
+		final InMemorySorter<T> sorter = this.sorter;
+
+		if (!sorter.isEmpty()) {
+			this.sortAlgo.sort(sorter);
+
+			final KeyGroupedIterator<T> keyIter = new KeyGroupedIterator<T>(sorter.getIterator(), this.serializer,
+					this.comparator);
+
+			final FlatCombineFunction<T> combiner = this.combiner;
+			final Collector<T> output = this.output;
+
+			// iterate over key groups
+			while (this.running && keyIter.nextKey()) {
+				combiner.combine(keyIter.getValues(), output);
+			}
 		}
 	}
 
 	@Override
 	public void cleanup() throws Exception {
-		if (this.input != null) {
-			this.input.close();
-			this.input = null;
-		}
+		this.memManager.release(this.sorter.dispose());
 	}
 
 	@Override
 	public void cancel() {
 		this.running = false;
+		this.memManager.release(this.sorter.dispose());
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cbbcf782/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
index 87cea30..6b18bb1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
@@ -109,7 +109,6 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
 		
 		// instantiate the serializer / comparator
 		final TypeSerializerFactory<T> serializerFactory = this.taskContext.getInputSerializer(0);
-		this.serializer = serializerFactory.getSerializer();
 		this.comparator = this.taskContext.getInputComparator(0);
 		this.serializer = serializerFactory.getSerializer();
 		this.reducer = this.taskContext.getStub();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cbbcf782/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorter.java
deleted file mode 100644
index 03794ff..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorter.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.sort;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.util.MutableObjectIterator;
-
-
-/**
- * The {@link AsynchronousPartialSorter} is a simple sort implementation that sorts
- * bulks inside its buffers, and returns them directly, without merging them. Therefore,
- * it establishes an order within certain windows, but not across them.
- */
-public class AsynchronousPartialSorter<E> extends UnilateralSortMerger<E> {
-	
-	private BufferQueueIterator bufferIterator;
-	
-	// ------------------------------------------------------------------------
-	// Constructor
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * 
-	 * 
-	 * @param memoryManager The memory manager from which to allocate the memory.
-	 * @param input The input that is sorted by this sorter.
-	 * @param parentTask The parent task, which owns all resources used by this sorter.
-	 * @param serializerFactory The type serializer.
-	 * @param comparator The type comparator establishing the order relation.
-	 * @param memoryFraction The fraction of memory dedicated to sorting.
-	 * 
-	 * @throws IOException Thrown, if an error occurs initializing the resources for external sorting.
-	 * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to
-	 *                                   perform the sort.
-	 */
-	public AsynchronousPartialSorter(MemoryManager memoryManager,
-			MutableObjectIterator<E> input, AbstractInvokable parentTask, 
-			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
-			double memoryFraction)
-	throws IOException, MemoryAllocationException
-	{
-		super(memoryManager, null, input, parentTask, serializerFactory, comparator, memoryFraction, 1, 2, 0.0f, true);
-	}
-	
-
-	public void close() {
-		// make a best effort to close the buffer iterator
-		try {
-			if (this.bufferIterator != null) {
-				this.bufferIterator.close();
-				this.bufferIterator = null;
-			}
-		}
-		finally {
-			super.close();
-		}
-	}
-	
-	/* 
-	 * This method does not actually create a spilling thread, but grabs the circular queues and creates the
-	 * iterator that reads from the sort buffers in turn.
-	 */
-	@Override
-	protected ThreadBase<E> getSpillingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues,
-			AbstractInvokable parentTask, MemoryManager memoryManager, IOManager ioManager, 
-			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
-			List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxFileHandles)
-	{
-		this.bufferIterator = new BufferQueueIterator(queues);
-		setResultIterator(this.bufferIterator);
-		
-		return null;
-	}
-
-	// ------------------------------------------------------------------------
-
-	private final class BufferQueueIterator implements MutableObjectIterator<E> {
-		
-		private final CircularQueues<E> queues;
-		
-		private CircularElement<E> currentElement;
-		
-		private MutableObjectIterator<E> currentIterator;
-		
-		private volatile boolean closed = false;
-
-
-		protected BufferQueueIterator(CircularQueues<E> queues) {
-			this.queues = queues;
-		}
-
-
-		@Override
-		public E next(final E reuse) throws IOException {
-			E result;
-			if (this.currentIterator != null && ((result = this.currentIterator.next(reuse)) != null)) {
-				return result;
-			}
-			else if (this.closed) {
-				throw new IllegalStateException("The sorter has been closed.");
-			}
-			else {
-				if (AsynchronousPartialSorter.this.iteratorException != null) {
-					throw new IOException("The sorter has ancountered an error.", AsynchronousPartialSorter.this.iteratorException);
-				}
-				
-				while (true) {
-					if (this.currentElement == endMarker()) {
-						return null;
-					}
-					else if (this.currentElement != null) {
-						// return the current element to the empty queue
-						this.currentElement.buffer.reset();
-						this.queues.empty.add(this.currentElement);
-					}
-					
-					// get a new element
-					try {
-						this.currentElement = null;
-						while (!this.closed && this.currentElement == null) {
-							this.currentElement = this.queues.spill.poll(1000, TimeUnit.MILLISECONDS);
-						}
-						if (AsynchronousPartialSorter.this.iteratorException != null) {
-							throw new IOException("The sorter has ancountered an error.", AsynchronousPartialSorter.this.iteratorException);
-						}
-						
-						if (this.currentElement == endMarker()) {
-							// signals the end, no more buffers will come
-							// release the memory first before returning
-							releaseSortBuffers();
-							return null;
-						}
-						if (this.currentElement == spillingMarker()) {
-							this.currentElement = null;
-							continue;
-						}
-					}
-					catch (InterruptedException e) {
-						throw new RuntimeException("Iterator was interrupted getting the next sortedBuffer.");
-					}
-					
-					this.currentIterator = this.currentElement.buffer.getIterator();
-					if ((result = this.currentIterator.next(reuse)) != null) {
-						return result;
-					}
-					this.currentIterator = null;
-				}
-			}
-		}
-		
-		public void close() {
-			synchronized (this) {
-				if (this.closed) {
-					return;
-				}
-				this.closed = true;
-			}
-			
-			if (this.currentElement != null) {
-				this.queues.empty.add(this.currentElement);
-				this.currentElement = null;
-			}
-			if (this.currentIterator != null) {
-				this.currentIterator = null;
-			}
-		}
-		
-		private final void releaseSortBuffers() 	{
-			while (!this.queues.empty.isEmpty()) {
-				final CircularElement<E> elem = this.queues.empty.poll();
-				if (elem != null) {
-					final InMemorySorter<E> sorter = elem.buffer;
-					final List<MemorySegment> segments = sorter.dispose();
-					AsynchronousPartialSorter.this.memoryManager.release(segments);
-				}
-			}
-		}
-
-	};
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cbbcf782/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorterCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorterCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorterCollector.java
deleted file mode 100644
index a41dbf1..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorterCollector.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.sort;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.util.MutableObjectIterator;
-
-/**
- * The {@link AsynchronousPartialSorterCollector} is a simple sort implementation that sorts
- * bulks inside its buffers, and returns them directly, without merging them. Therefore,
- * it establishes an order within certain windows, but not across them.
- * <p>
- * In contract to the {@link AsynchronousPartialSorter}, this class has no dedicated reading thread that
- * pulls records from an iterator, but offers a collector into which data to be sorted is pushed.
- * 
- */
-public class AsynchronousPartialSorterCollector<E> extends AsynchronousPartialSorter<E> {
-	
-	private InputDataCollector<E> collector;
-	
-	// ------------------------------------------------------------------------
-	// Constructor
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * @param memoryManager The memory manager from which to allocate the memory.
-	 * @param parentTask The parent task, which owns all resources used by this sorter.
-	 * @param serializerFactory The type serializer.
-	 * @param comparator The type comparator establishing the order relation.
-	 * @param memoryFraction The fraction of memory dedicated to sorting.
-	 * 
-	 * @throws IOException Thrown, if an error occurs initializing the resources for external sorting.
-	 * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to
-	 *                                   perform the sort.
-	 */
-	public AsynchronousPartialSorterCollector(MemoryManager memoryManager,
-			AbstractInvokable parentTask, 
-			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
-			double memoryFraction)
-	throws IOException, MemoryAllocationException
-	{
-		super(memoryManager, null, parentTask, serializerFactory, comparator,
-				memoryFraction);
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Gets the collector that writes into the sort buffers.
-	 * 
-	 * @return The collector that writes into the sort buffers.
-	 */
-	public InputDataCollector<E> getInputCollector() {
-		return this.collector;
-	}
-
-	@Override
-	protected ThreadBase<E> getReadingThread(ExceptionHandler<IOException> exceptionHandler,
-		MutableObjectIterator<E> reader, CircularQueues<E> queues, AbstractInvokable parentTask,
-		TypeSerializer<E> serializer, long startSpillingBytes)
-	{
-		this.collector = new InputDataCollector<E>(queues, startSpillingBytes);
-		return null;
-	}
-	
-
-	public void close() {
-		try {
-			if (this.collector != null) {
-				this.collector.close();
-			}
-		}
-		finally {
-			super.close();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cbbcf782/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AsynchonousPartialSorterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AsynchonousPartialSorterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AsynchonousPartialSorterITCase.java
deleted file mode 100644
index 90fe59f..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AsynchonousPartialSorterITCase.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.sort;
-
-import java.io.IOException;
-
-import org.junit.Assert;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
-import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.sort.AsynchronousPartialSorter;
-import org.apache.flink.runtime.operators.sort.ExceptionHandler;
-import org.apache.flink.runtime.operators.sort.Sorter;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Value;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-
-public class AsynchonousPartialSorterITCase {
-	
-	private static final Log LOG = LogFactory.getLog(AsynchonousPartialSorterITCase.class);
-
-	private static final long SEED = 649180756312423613L;
-
-	private static final int KEY_MAX = Integer.MAX_VALUE;
-
-	private static final Value VAL = new Value("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ");
-	
-	private static final int VALUE_LENGTH = 114;
-
-	public static final int MEMORY_SIZE = 1024 * 1024 * 32;
-	
-	private final AbstractInvokable parentTask = new DummyInvokable();
-
-	private IOManager ioManager;
-
-	private MemoryManager memoryManager;
-	
-	private TypeSerializerFactory<Record> serializer;
-	
-	private TypeComparator<Record> comparator;
-
-
-	@SuppressWarnings("unchecked")
-	@Before
-	public void beforeTest()
-	{
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE,1);
-		this.ioManager = new IOManager();
-		this.serializer = RecordSerializerFactory.get();
-		this.comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
-	}
-
-	@After
-	public void afterTest()
-	{
-		this.ioManager.shutdown();
-		if (!this.ioManager.isProperlyShutDown()) {
-			Assert.fail("I/O Manager was not properly shut down.");
-		}
-		
-		if (this.memoryManager != null) {
-			Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.", 
-				this.memoryManager.verifyEmpty());
-			this.memoryManager.shutdown();
-			this.memoryManager = null;
-		}
-	}
-
-	@Test
-	public void testSmallSortInOneWindow() throws Exception
-	{
-		try {
-			final int NUM_RECORDS = 1000;
-			
-			// reader
-			final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
-			final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_RECORDS);
-			
-			// merge iterator
-			LOG.debug("Initializing sortmerger...");
-			Sorter<Record> sorter = new AsynchronousPartialSorter<Record>(this.memoryManager, source,
-				this.parentTask, this.serializer, this.comparator, 1.0);
-	
-			runPartialSorter(sorter, NUM_RECORDS, 0);
-		}
-		catch (Exception t) {
-			t.printStackTrace();
-			Assert.fail("Test failed due to an uncaught exception: " + t.getMessage());
-		}
-	}
-	
-	@Test
-	public void testLargeSortAcrossTwoWindows() throws Exception
-	{
-		try {
-			final int NUM_RECORDS = 100000;
-			
-			// reader
-			final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
-			final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_RECORDS);
-			
-			// merge iterator
-			LOG.debug("Initializing sortmerger...");
-			Sorter<Record> sorter = new AsynchronousPartialSorter<Record>(this.memoryManager, source,
-				this.parentTask, this.serializer, this.comparator, 0.2);
-	
-			runPartialSorter(sorter, NUM_RECORDS, 2);
-		}
-		catch (Exception t) {
-			t.printStackTrace();
-			Assert.fail("Test failed due to an uncaught exception: " + t.getMessage());
-		}
-	}
-	
-	@Test
-	public void testLargeSortAcrossMultipleWindows() throws Exception
-	{
-		try {
-			final int NUM_RECORDS = 1000000;
-			
-			// reader
-			final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
-			final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_RECORDS);
-			
-			// merge iterator
-			LOG.debug("Initializing sortmerger...");
-			Sorter<Record> sorter = new AsynchronousPartialSorter<Record>(this.memoryManager, source,
-				this.parentTask, this.serializer, this.comparator, 0.15);
-	
-			runPartialSorter(sorter, NUM_RECORDS, 27);
-		}
-		catch (Exception t) {
-			t.printStackTrace();
-			Assert.fail("Test failed due to an uncaught exception: " + t.getMessage());
-		}
-	}
-	
-	@Test
-	public void testExceptionForwarding() throws IOException
-	{
-		try {
-			Sorter<Record> sorter = null;
-			try	{
-				final int NUM_RECORDS = 100;
-
-				// reader
-				final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
-				final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_RECORDS);
-				
-				// merge iterator
-				LOG.debug("Initializing sortmerger...");
-				sorter = new ExceptionThrowingAsynchronousPartialSorter<Record>(this.memoryManager, source,
-						this.parentTask, this.serializer, this.comparator, 1.0);
-		
-				runPartialSorter(sorter, NUM_RECORDS, 0);
-				
-				Assert.fail("Expected Test Exception not thrown.");
-			} catch(Exception e) {
-				if (!containsTriggerException(e)) {
-					throw e;
-				}
-			} finally {
-				if (sorter != null) {
-					sorter.close();
-				}
-			}
-		}
-		catch (Exception t) {
-			t.printStackTrace();
-			Assert.fail("Test failed due to an uncaught exception: " + t.getMessage());
-		}
-	}
-	
-	private static void runPartialSorter(Sorter<Record> sorter, 
-								int expectedNumResultRecords, int expectedNumWindowTransitions)
-	throws Exception
-	{
-		// check order
-		final MutableObjectIterator<Record> iterator = sorter.getIterator();
-		int pairsEmitted = 1;
-		int windowTransitions = 0;
-		
-		Record rec1 = new Record();
-		Record rec2 = new Record();
-		
-		LOG.debug("Checking results...");
-		Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
-		while ((rec2 = iterator.next(rec2)) != null)
-		{
-			final TestData.Key k1 = rec1.getField(0, TestData.Key.class);
-			final TestData.Key k2 = rec2.getField(0, TestData.Key.class);
-			pairsEmitted++;
-			
-			// if the next key is smaller again, we have a new window
-			if (k1.compareTo(k2) > 0) {
-				windowTransitions++;
-			}
-			
-			Record tmp = rec1;
-			rec1 = rec2;
-			k1.setKey(k2.getKey());
-			
-			rec2 = tmp;
-		}
-		
-		sorter.close();
-		
-		Assert.assertEquals("Sorter did not return the expected number of result records.",
-			expectedNumResultRecords, pairsEmitted);
-		Assert.assertEquals("The partial sorter made an unexpected number of window transitions.",
-			expectedNumWindowTransitions, windowTransitions); 
-	}
-	
-	private static boolean containsTriggerException(Throwable exception)
-	{
-		while (exception != null) {
-			if (exception.getClass().equals(TriggeredException.class)) {
-				return true;
-			}
-			exception = exception.getCause();
-		}
-		return false;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//              					 Internal classes
-	// --------------------------------------------------------------------------------------------
-	
-	/*
-	 * Mock exception thrown on purpose.
-	 */
-	@SuppressWarnings("serial")
-	private static class TriggeredException extends IOException {}
-	
-	/*
-	 * Mocked sorter that throws an exception in the sorting thread.
-	 */
-	private static class ExceptionThrowingAsynchronousPartialSorter<E> extends AsynchronousPartialSorter<E>
-	{	
-		protected static class ExceptionThrowingSorterThread<E> extends SortingThread<E> {
-				
-			public ExceptionThrowingSorterThread(ExceptionHandler<IOException> exceptionHandler,
-						org.apache.flink.runtime.operators.sort.UnilateralSortMerger.CircularQueues<E> queues,
-						AbstractInvokable parentTask)
-			{
-				super(exceptionHandler, queues, parentTask);
-			}
-	
-			@Override
-			public void go() throws IOException {
-				throw new TriggeredException();
-			}
-		}
-
-		public ExceptionThrowingAsynchronousPartialSorter(MemoryManager memoryManager,
-				MutableObjectIterator<E> input, AbstractInvokable parentTask, 
-				TypeSerializerFactory<E> serializer, TypeComparator<E> comparator,
-				double memoryFraction)
-		throws IOException, MemoryAllocationException
-		{
-			super(memoryManager, input, parentTask, serializer, comparator, memoryFraction);
-		}
-
-
-		@Override
-		protected ThreadBase<E> getSortingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues,
-				AbstractInvokable parentTask)
-		{
-			return new ExceptionThrowingSorterThread<E>(exceptionHandler, queues, parentTask);
-		}		
-	}
-}


[2/4] git commit: [FLINK-1079] Fix inconsistent parameter naming

Posted by se...@apache.org.
[FLINK-1079] Fix inconsistent parameter naming


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/fbed013d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/fbed013d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/fbed013d

Branch: refs/heads/master
Commit: fbed013db60d7c45dcd11b6303ffa16220557e13
Parents: cbbcf78
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 1 19:01:40 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 1 19:15:17 2014 +0200

----------------------------------------------------------------------
 .../flink/api/common/functions/MapPartitionFunction.java    | 4 ++--
 .../flink/api/java/functions/RichMapPartitionFunction.java  | 2 +-
 .../api/java/record/functions/MapPartitionFunction.java     | 4 ++--
 .../org/apache/flink/test/operators/MapPartitionITCase.java | 9 ++++-----
 4 files changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbed013d/flink-core/src/main/java/org/apache/flink/api/common/functions/MapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapPartitionFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapPartitionFunction.java
index fbd5c9e..1c21db2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapPartitionFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapPartitionFunction.java
@@ -45,9 +45,9 @@ public interface MapPartitionFunction<T, O> extends Function, Serializable {
 	/**
 	 * A user-implemented function that modifies or transforms an incoming object.
 	 *
-	 * @param records All records for the mapper
+	 * @param values All records for the mapper
 	 * @param out The collector to hand results to.
 	 * @throws Exception
 	 */
-	void mapPartition(Iterable<T> records, Collector<O> out) throws Exception;
+	void mapPartition(Iterable<T> values, Collector<O> out) throws Exception;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbed013d/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapPartitionFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapPartitionFunction.java
index 04176a2..4a3beab 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapPartitionFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapPartitionFunction.java
@@ -37,5 +37,5 @@ public abstract class RichMapPartitionFunction<I, O> extends AbstractRichFunctio
 	private static final long serialVersionUID = 1L;
 	
 	@Override
-	public abstract void mapPartition(Iterable<I> records, Collector<O> out) throws Exception;
+	public abstract void mapPartition(Iterable<I> values, Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbed013d/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java
index 4eb049d..0543286 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java
@@ -34,7 +34,7 @@ public abstract class MapPartitionFunction extends AbstractRichFunction implemen
 	 * This method must be implemented to provide a user implementation of a MapPartitionFunction.
 	 * It is called for a full input set.
 	 *
-	 * @param records all input records
+	 * @param values all input records
 	 * @param out A collector that collects all output records.
 	 *
 	 * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
@@ -42,5 +42,5 @@ public abstract class MapPartitionFunction extends AbstractRichFunction implemen
 	 *                   decide whether to retry the mapper execution.
 	 */
 	@Override
-	public abstract void mapPartition(Iterable<Record> records, Collector<Record> out) throws Exception;
+	public abstract void mapPartition(Iterable<Record> values, Collector<Record> out) throws Exception;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fbed013d/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java
index 55b9544..1008c1d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.test.operators;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -84,10 +83,10 @@ public class MapPartitionITCase extends JavaProgramTestBase {
 	public static class TestMapPartition implements MapPartitionFunction<Tuple2<String, String>, Tuple2<String, Integer>> {
 
 		@Override
-		public void mapPartition(Iterable<Tuple2<String, String>> records, Collector<Tuple2<String, Integer>> out) {
-			for (Tuple2<String, String> record : records) {
-				String keyString = record.f0;
-				String valueString = record.f1;
+		public void mapPartition(Iterable<Tuple2<String, String>> values, Collector<Tuple2<String, Integer>> out) {
+			for (Tuple2<String, String> value : values) {
+				String keyString = value.f0;
+				String valueString = value.f1;
 				
 				int keyInt = Integer.parseInt(keyString);
 				int valueInt = Integer.parseInt(valueString);


[3/4] git commit: [FLINK-1084] Fix broken links in "How to add an operator"

Posted by se...@apache.org.
[FLINK-1084] Fix broken links in "How to add an operator"


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c9dd6038
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c9dd6038
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c9dd6038

Branch: refs/heads/master
Commit: c9dd60385e05f9723d01554b8674e0c620de49c2
Parents: 0a6e53d
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 1 19:14:24 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 1 19:15:17 2014 +0200

----------------------------------------------------------------------
 docs/internal_add_operator.md       | 12 ++++++------
 docs/internal_program_life_cycle.md |  2 +-
 2 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9dd6038/docs/internal_add_operator.md
----------------------------------------------------------------------
diff --git a/docs/internal_add_operator.md b/docs/internal_add_operator.md
index 40b0a69..f3c9758 100644
--- a/docs/internal_add_operator.md
+++ b/docs/internal_add_operator.md
@@ -56,7 +56,7 @@ public static <T>DataSet<Long> count(DataSet<T> data) {
 
 A more complex example of an operation via specialization is the {% gh_link /flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java "Aggregation Operation" %} in the Java API. It is implemented by means of a *GroupReduce* UDF.
 
-The Aggregate Operation comes with its own operator in the *Java API*, but translates itself into a {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java "GroupReduceOperatorBase" %} in the *Common API*. (see [Program Life Cycle](program_life_cycle.html) for details of how an operation from the *Java API* becomes an operation of the *Common API* and finally a runtime operation.)
+The Aggregate Operation comes with its own operator in the *Java API*, but translates itself into a {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java "GroupReduceOperatorBase" %} in the *Common API*. (see [Program Life Cycle](internal_program_life_cycle.html) for details of how an operation from the *Java API* becomes an operation of the *Common API* and finally a runtime operation.)
 The Java API aggregation operator is only a builder that takes the types of aggregations and the field positions, and used that information to
 parameterize the GroupReduce UDF that performs the aggregations.
 
@@ -109,7 +109,7 @@ function, but invoked only once per parallel partition.
 
 **Runtime**
 
-Runtime Operators are implemented using the {% gh_link /flink-runtime/src/main/java/org/apache/flink/pact/runtime/task/PactDriver.java "Driver" %} interface. The interface defines the methods that describe the operator towards the runtime. The {% gh_link /flink-runtime/src/main/java/org/apache/flink/pact/runtime/task/MapDriver.java "MapDriver" %} serves as a simple example of how those operators work.
+Runtime Operators are implemented using the {% gh_link /flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java "Driver" %} interface. The interface defines the methods that describe the operator towards the runtime. The {% gh_link /flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java "MapDriver" %} serves as a simple example of how those operators work.
 
 The runtime works with the `MutableObjectIterator`, which describes data streams with the ability to reuse objects, to reduce pressure on the garbage collector.
 
@@ -132,16 +132,16 @@ To increase efficiency, it is often beneficial to implement a *chained* version
 operators run in the same thread as their preceding operator, and work with nested function calls.
 This is very efficient, because it saves serialization/deserialization overhead.
 
-To learn how to implement a chained operator, take a look at the {% gh_link /flink-runtime/src/main/java/org/apache/flink/pact/runtime/task/MapDriver.java "MapDriver" %} (regular) and the
-{% gh_link /flink-runtime/src/main/java/org/apache/flink/pact/runtime/task/chaining/ChainedMapDriver.java "ChainedMapDriver" %} (chained variant).
+To learn how to implement a chained operator, take a look at the {% gh_link /flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java "MapDriver" %} (regular) and the
+{% gh_link /flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java "ChainedMapDriver" %} (chained variant).
 
 
 **Optimizer/Compiler**
 
-This section does a minimal discussion of the important steps to add an operator. Please see the [Optimizer](optimizer.html) docs for more detail on how the optimizer works.
+This section does a minimal discussion of the important steps to add an operator. Please see the [Optimizer](internal_optimizer.html) docs for more detail on how the optimizer works.
 To allow the optimizer to include a new operator in its planning, it needs a bit of information about it; in particular, the following information:
 
-- *{% gh_link /flink-runtime/src/main/java/org/apache/flink/pact/runtime/task/DriverStrategy.java "DriverStrategy" %}*: The operation needs to be added to the Enum, to make it available to the optimizer. The parameters to the Enum entry define which class implements the runtime operator, its chained version, whether the operator accumulates records (and needs memory for that), and whether it requires a comparator (works on keys). For our example, we can add the entry
+- *{% gh_link /flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java "DriverStrategy" %}*: The operation needs to be added to the Enum, to make it available to the optimizer. The parameters to the Enum entry define which class implements the runtime operator, its chained version, whether the operator accumulates records (and needs memory for that), and whether it requires a comparator (works on keys). For our example, we can add the entry
 ``` java
 MAP_PARTITION(MapPartitionDriver.class, null /* or chained variant */, PIPELINED, false)
 ```

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9dd6038/docs/internal_program_life_cycle.md
----------------------------------------------------------------------
diff --git a/docs/internal_program_life_cycle.md b/docs/internal_program_life_cycle.md
index c68232b..90d31cc 100644
--- a/docs/internal_program_life_cycle.md
+++ b/docs/internal_program_life_cycle.md
@@ -2,4 +2,4 @@
 title:  "Program Life Cycle"
 ---
 
-
+To be done...


[4/4] git commit: [FLINK-1078] PrintingOutputFormat uses same partition indexing as FileOutputFormat.

Posted by se...@apache.org.
[FLINK-1078] PrintingOutputFormat uses same partition indexing as FileOutputFormat.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/0a6e53dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/0a6e53dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/0a6e53dd

Branch: refs/heads/master
Commit: 0a6e53ddc05ba901e4b04c9208c2893e686bdb42
Parents: fbed013
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 1 19:03:04 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 1 19:15:17 2014 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/io/PrintingOutputFormat.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0a6e53dd/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
index 435057f..42e0b46 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
@@ -74,7 +74,7 @@ public class PrintingOutputFormat<T> implements OutputFormat<T> {
 		this.stream = this.target == STD_OUT ? System.out : System.err;
 		
 		// set the prefix if we have a >1 DOP
-		this.prefix = (numTasks > 1) ? (taskNumber + "> ") : null;
+		this.prefix = (numTasks > 1) ? ((taskNumber+1) + "> ") : null;
 	}
 
 	@Override