You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/04/13 17:36:04 UTC
flink git commit: [hotfix] [tests] Add cross transformation to
OverwriteObjects manual test
Repository: flink
Updated Branches:
refs/heads/master e7d78e63a -> 6c061684f
[hotfix] [tests] Add cross transformation to OverwriteObjects manual test
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c061684
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c061684
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c061684
Branch: refs/heads/master
Commit: 6c061684f93ddcd375f37d3ef71f6dd66783df8b
Parents: e7d78e6
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Apr 13 10:48:52 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Apr 13 10:51:06 2016 -0400
----------------------------------------------------------------------
.../flink/test/manual/OverwriteObjects.java | 67 ++++++++++++++++++--
1 file changed, 63 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6c061684/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java b/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
index 1a70c69..23d8e67 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
@@ -18,6 +18,7 @@
package org.apache.flink.test.manual;
+import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
@@ -73,6 +74,7 @@ public class OverwriteObjects {
testReduce(env);
testGroupedReduce(env);
testJoin(env);
+ testCross(env);
}
}
@@ -127,7 +129,7 @@ public class OverwriteObjects {
Assert.assertEquals(disabledChecksum, enabledChecksum);
}
- public class OverwriteObjectsReduce implements ReduceFunction<Tuple2<IntValue, IntValue>> {
+ private class OverwriteObjectsReduce implements ReduceFunction<Tuple2<IntValue, IntValue>> {
private Scrambler scrambler;
public OverwriteObjectsReduce(boolean keyed) {
@@ -252,7 +254,7 @@ public class OverwriteObjects {
}
}
- public class OverwriteObjectsJoin implements JoinFunction<Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>> {
+ private class OverwriteObjectsJoin implements JoinFunction<Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>> {
private Scrambler scrambler = new Scrambler(true);
@Override
@@ -263,12 +265,69 @@ public class OverwriteObjects {
// --------------------------------------------------------------------------------------------
- private DataSet<Tuple2<IntValue, IntValue>> getDataSet(ExecutionEnvironment env) {
+ public void testCross(ExecutionEnvironment env) throws Exception {
+ /*
+ * Test CrossDriver
+ */
+
+ LOG.info("Testing cross");
+
+ DataSet<Tuple2<IntValue, IntValue>> small = getDataSet(env, 100, 20);
+ DataSet<Tuple2<IntValue, IntValue>> large = getDataSet(env, 10000, 2000);
+
+ // test NESTEDLOOP_BLOCKED_OUTER_FIRST and NESTEDLOOP_BLOCKED_OUTER_SECOND with object reuse enabled
+
+ env.getConfig().enableObjectReuse();
+
+ ChecksumHashCode enabledChecksumWithHuge = DataSetUtils.checksumHashCode(small
+ .crossWithHuge(large)
+ .with(new OverwriteObjectsCross()));
+
+ ChecksumHashCode enabledChecksumWithTiny = DataSetUtils.checksumHashCode(small
+ .crossWithTiny(large)
+ .with(new OverwriteObjectsCross()));
+
+ Assert.assertEquals(enabledChecksumWithHuge, enabledChecksumWithTiny);
+
+ // test NESTEDLOOP_BLOCKED_OUTER_FIRST and NESTEDLOOP_BLOCKED_OUTER_SECOND with object reuse disabled
+
+ env.getConfig().disableObjectReuse();
+
+ ChecksumHashCode disabledChecksumWithHuge = DataSetUtils.checksumHashCode(small
+ .crossWithHuge(large)
+ .with(new OverwriteObjectsCross()));
+
+ ChecksumHashCode disabledChecksumWithTiny = DataSetUtils.checksumHashCode(small
+ .crossWithTiny(large)
+ .with(new OverwriteObjectsCross()));
+
+ Assert.assertEquals(disabledChecksumWithHuge, disabledChecksumWithTiny);
+
+ // verify that checksums match between object reuse enabled and disabled
+ Assert.assertEquals(enabledChecksumWithHuge, disabledChecksumWithHuge);
+ }
+
+ private class OverwriteObjectsCross implements CrossFunction<Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>> {
+ private Scrambler scrambler = new Scrambler(true);
+
+ @Override
+ public Tuple2<IntValue, IntValue> cross(Tuple2<IntValue, IntValue> a, Tuple2<IntValue, IntValue> b) throws Exception {
+ return scrambler.scramble(a, b);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private DataSet<Tuple2<IntValue, IntValue>> getDataSet(ExecutionEnvironment env, int numberOfElements, int keyRange) {
return env
- .fromCollection(new TupleIntValueIntValueIterator(NUMBER_OF_ELEMENTS, KEY_RANGE),
+ .fromCollection(new TupleIntValueIntValueIterator(numberOfElements, keyRange),
TupleTypeInfo.<Tuple2<IntValue, IntValue>>getBasicAndBasicValueTupleTypeInfo(IntValue.class, IntValue.class));
}
+ private DataSet<Tuple2<IntValue, IntValue>> getDataSet(ExecutionEnvironment env) {
+ return getDataSet(env, NUMBER_OF_ELEMENTS, KEY_RANGE);
+ }
+
private DataSet<Tuple2<IntValue, IntValue>> getFilteredDataSet(ExecutionEnvironment env) {
return getDataSet(env)
.filter(new FilterFunction<Tuple2<IntValue, IntValue>>() {