You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/13 17:11:58 UTC
[3/3] flink git commit: [FLINK-1085] [runtime] Combiner forwards
oversized records, rather than failing on them.
[FLINK-1085] [runtime] Combiner forwards oversized records, rather than failing on them.
This closes #854
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/72718811
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/72718811
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/72718811
Branch: refs/heads/master
Commit: 7271881163d240ad1106a77036dce981dafb82f3
Parents: 7761ddb
Author: dabaitu <to...@gmail.com>
Authored: Sat Jun 20 15:35:48 2015 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jul 13 16:29:38 2015 +0200
----------------------------------------------------------------------
.../operators/GroupReduceCombineDriver.java | 53 +++++++++-----------
.../runtime/operators/CombineTaskTest.java | 48 ++++++++++++++----
.../java/org/apache/flink/yarn/UtilsTest.java | 2 +-
3 files changed, 63 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/72718811/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index 493eb4f..c426295 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
-import java.io.IOException;
import java.util.List;
/**
@@ -79,6 +78,8 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
private Collector<OUT> output;
+ private long oversizedRecordCount = 0L;
+
private volatile boolean running = true;
private boolean objectReuseEnabled = false;
@@ -142,7 +143,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
this.objectReuseEnabled = executionConfig.isObjectReuseEnabled();
if (LOG.isDebugEnabled()) {
- LOG.debug("GroupReduceCombineDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
+ LOG.debug("GroupReduceCombineDriver object reuse: {}.", (this.objectReuseEnabled ? "ENABLED" : "DISABLED"));
}
}
@@ -170,7 +171,10 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
// write the value again
if (!this.sorter.write(value)) {
- throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
+ ++oversizedRecordCount;
+ LOG.debug("Cannot write record to fresh sort buffer. Record too large. Oversized record count: {}", oversizedRecordCount);
+ // simply forward the record
+ this.output.collect((OUT)value);
}
}
@@ -179,39 +183,28 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
}
private void sortAndCombine() throws Exception {
+ if (sorter.isEmpty()) {
+ return;
+ }
+
final InMemorySorter<IN> sorter = this.sorter;
+ this.sortAlgo.sort(sorter);
+ final GroupCombineFunction<IN, OUT> combiner = this.combiner;
+ final Collector<OUT> output = this.output;
+ // iterate over key groups
if (objectReuseEnabled) {
- if (!sorter.isEmpty()) {
- this.sortAlgo.sort(sorter);
-
- final ReusingKeyGroupedIterator<IN> keyIter =
- new ReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.serializer, this.groupingComparator);
-
- final GroupCombineFunction<IN, OUT> combiner = this.combiner;
- final Collector<OUT> output = this.output;
-
- // iterate over key groups
- while (this.running && keyIter.nextKey()) {
- combiner.combine(keyIter.getValues(), output);
- }
+ final ReusingKeyGroupedIterator<IN> keyIter =
+ new ReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.serializer, this.groupingComparator);
+ while (this.running && keyIter.nextKey()) {
+ combiner.combine(keyIter.getValues(), output);
}
} else {
- if (!sorter.isEmpty()) {
- this.sortAlgo.sort(sorter);
-
- final NonReusingKeyGroupedIterator<IN> keyIter =
- new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);
-
- final GroupCombineFunction<IN, OUT> combiner = this.combiner;
- final Collector<OUT> output = this.output;
-
- // iterate over key groups
- while (this.running && keyIter.nextKey()) {
- combiner.combine(keyIter.getValues(), output);
- }
+ final NonReusingKeyGroupedIterator<IN> keyIter =
+ new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);
+ while (this.running && keyIter.nextKey()) {
+ combiner.combine(keyIter.getValues(), output);
}
-
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/72718811/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
index 3d9e991..7772151 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
@@ -19,23 +19,21 @@
package org.apache.flink.runtime.operators;
import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.operators.testutils.*;
import org.junit.Assert;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeutils.record.RecordComparator;
import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
-import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
-import org.apache.flink.runtime.operators.testutils.DriverTestBase;
-import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
-import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Key;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator;
import org.junit.Test;
public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Record, ?>>
@@ -65,7 +63,7 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco
addDriverComparator(this.comparator);
addDriverComparator(this.comparator);
setOutput(this.outList);
-
+
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
getTaskConfig().setRelativeMemoryDriver(combine_frac);
getTaskConfig().setFilehandlesDriver(2);
@@ -92,7 +90,39 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco
this.outList.clear();
}
-
+
+ @Test
+ public void testOversizedRecordCombineTask() {
+ int tenMil = 10000000;
+ Generator g = new Generator(561349061987311L, 1, tenMil);
+ //generate 10 records each of size 10MB
+ final TestData.GeneratorIterator gi = new TestData.GeneratorIterator(g, 10);
+ List<MutableObjectIterator<Record>> inputs = new ArrayList<MutableObjectIterator<Record>>();
+ inputs.add(gi);
+
+ addInput(new UnionIterator<Record>(inputs));
+ addDriverComparator(this.comparator);
+ addDriverComparator(this.comparator);
+ setOutput(this.outList);
+
+ getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
+ getTaskConfig().setRelativeMemoryDriver(combine_frac);
+ getTaskConfig().setFilehandlesDriver(2);
+
+ final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
+
+ try {
+ testDriver(testTask, MockCombiningReduceStub.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Invoke method caused exception.");
+ }
+
+ Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+10, this.outList.size() == 10);
+
+ this.outList.clear();
+ }
+
@Test
public void testFailingCombineTask() {
int keyCnt = 100;
@@ -119,7 +149,7 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco
Assert.fail("Test failed due to an exception.");
}
}
-
+
@Test
public void testCancelCombineTaskSorting()
{
http://git-wip-us.apache.org/repos/asf/flink/blob/72718811/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
index 25a1413..9ee60a5 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
@@ -36,8 +36,8 @@ public class UtilsTest {
@Test
public void testUberjarLocator() {
File dir = YarnTestBase.findFile(".", new YarnTestBase.RootDirFilenameFilter());
- Assert.assertTrue(dir.getName().endsWith(".jar"));
Assert.assertNotNull(dir);
+ Assert.assertTrue(dir.getName().endsWith(".jar"));
dir = dir.getParentFile().getParentFile(); // from uberjar to lib to root
Assert.assertTrue(dir.exists());
Assert.assertTrue(dir.isDirectory());