You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/13 17:11:58 UTC

[3/3] flink git commit: [FLINK-1085] [runtime] Combiner forwards oversized records, rather than failing on them.

[FLINK-1085] [runtime] Combiner forwards oversized records, rather than failing on them.

This closes #854


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/72718811
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/72718811
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/72718811

Branch: refs/heads/master
Commit: 7271881163d240ad1106a77036dce981dafb82f3
Parents: 7761ddb
Author: dabaitu <to...@gmail.com>
Authored: Sat Jun 20 15:35:48 2015 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jul 13 16:29:38 2015 +0200

----------------------------------------------------------------------
 .../operators/GroupReduceCombineDriver.java     | 53 +++++++++-----------
 .../runtime/operators/CombineTaskTest.java      | 48 ++++++++++++++----
 .../java/org/apache/flink/yarn/UtilsTest.java   |  2 +-
 3 files changed, 63 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/72718811/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index 493eb4f..c426295 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
-import java.io.IOException;
 import java.util.List;
 
 /**
@@ -79,6 +78,8 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
 
 	private Collector<OUT> output;
 
+	private long oversizedRecordCount = 0L;
+
 	private volatile boolean running = true;
 
 	private boolean objectReuseEnabled = false;
@@ -142,7 +143,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
 		this.objectReuseEnabled = executionConfig.isObjectReuseEnabled();
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("GroupReduceCombineDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
+			LOG.debug("GroupReduceCombineDriver object reuse: {}.", (this.objectReuseEnabled ? "ENABLED" : "DISABLED"));
 		}
 	}
 
@@ -170,7 +171,10 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
 
 			// write the value again
 			if (!this.sorter.write(value)) {
-				throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
+				++oversizedRecordCount;
+				LOG.debug("Cannot write record to fresh sort buffer. Record too large. Oversized record count: {}", oversizedRecordCount);
+				// simply forward the record
+				this.output.collect((OUT)value);
 			}
 		}
 
@@ -179,39 +183,28 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
 	}
 
 	private void sortAndCombine() throws Exception {
+		if (sorter.isEmpty()) {
+			return;
+		}
+
 		final InMemorySorter<IN> sorter = this.sorter;
+		this.sortAlgo.sort(sorter);
+		final GroupCombineFunction<IN, OUT> combiner = this.combiner;
+		final Collector<OUT> output = this.output;
 
+		// iterate over key groups
 		if (objectReuseEnabled) {
-			if (!sorter.isEmpty()) {
-				this.sortAlgo.sort(sorter);
-
-				final ReusingKeyGroupedIterator<IN> keyIter = 
-						new ReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.serializer, this.groupingComparator);
-
-				final GroupCombineFunction<IN, OUT> combiner = this.combiner;
-				final Collector<OUT> output = this.output;
-
-				// iterate over key groups
-				while (this.running && keyIter.nextKey()) {
-					combiner.combine(keyIter.getValues(), output);
-				}
+			final ReusingKeyGroupedIterator<IN> keyIter =
+					new ReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.serializer, this.groupingComparator);
+			while (this.running && keyIter.nextKey()) {
+				combiner.combine(keyIter.getValues(), output);
 			}
 		} else {
-			if (!sorter.isEmpty()) {
-				this.sortAlgo.sort(sorter);
-
-				final NonReusingKeyGroupedIterator<IN> keyIter = 
-						new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);
-
-				final GroupCombineFunction<IN, OUT> combiner = this.combiner;
-				final Collector<OUT> output = this.output;
-
-				// iterate over key groups
-				while (this.running && keyIter.nextKey()) {
-					combiner.combine(keyIter.getValues(), output);
-				}
+			final NonReusingKeyGroupedIterator<IN> keyIter = 
+					new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);
+			while (this.running && keyIter.nextKey()) {
+				combiner.combine(keyIter.getValues(), output);
 			}
-
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/72718811/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
index 3d9e991..7772151 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
@@ -19,23 +19,21 @@
 package org.apache.flink.runtime.operators;
 
 import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.operators.testutils.*;
 import org.junit.Assert;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.typeutils.record.RecordComparator;
 import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
-import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
-import org.apache.flink.runtime.operators.testutils.DriverTestBase;
-import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
-import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator;
 import org.junit.Test;
 
 public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Record, ?>>
@@ -65,7 +63,7 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco
 		addDriverComparator(this.comparator);
 		addDriverComparator(this.comparator);
 		setOutput(this.outList);
-		
+
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
 		getTaskConfig().setRelativeMemoryDriver(combine_frac);
 		getTaskConfig().setFilehandlesDriver(2);
@@ -92,7 +90,39 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco
 		
 		this.outList.clear();
 	}
-	
+
+	@Test
+	public void testOversizedRecordCombineTask() {
+		int tenMil = 10000000;
+		Generator g = new Generator(561349061987311L, 1, tenMil);
+		//generate 10 records each of size 10MB
+		final TestData.GeneratorIterator gi = new TestData.GeneratorIterator(g, 10);
+		List<MutableObjectIterator<Record>> inputs = new ArrayList<MutableObjectIterator<Record>>();
+		inputs.add(gi);
+
+		addInput(new UnionIterator<Record>(inputs));
+		addDriverComparator(this.comparator);
+		addDriverComparator(this.comparator);
+		setOutput(this.outList);
+
+		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
+		getTaskConfig().setRelativeMemoryDriver(combine_frac);
+		getTaskConfig().setFilehandlesDriver(2);
+
+		final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
+
+		try {
+			testDriver(testTask, MockCombiningReduceStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Invoke method caused exception.");
+		}
+
+		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+10, this.outList.size() == 10);
+
+		this.outList.clear();
+	}
+
 	@Test
 	public void testFailingCombineTask() {
 		int keyCnt = 100;
@@ -119,7 +149,7 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco
 			Assert.fail("Test failed due to an exception.");
 		}
 	}
-	
+
 	@Test
 	public void testCancelCombineTaskSorting()
 	{

http://git-wip-us.apache.org/repos/asf/flink/blob/72718811/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
index 25a1413..9ee60a5 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
@@ -36,8 +36,8 @@ public class UtilsTest {
 	@Test
 	public void testUberjarLocator() {
 		File dir = YarnTestBase.findFile(".", new YarnTestBase.RootDirFilenameFilter());
-		Assert.assertTrue(dir.getName().endsWith(".jar"));
 		Assert.assertNotNull(dir);
+		Assert.assertTrue(dir.getName().endsWith(".jar"));
 		dir = dir.getParentFile().getParentFile(); // from uberjar to lib to root
 		Assert.assertTrue(dir.exists());
 		Assert.assertTrue(dir.isDirectory());