You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/04/13 17:36:04 UTC

flink git commit: [hotfix] [tests] Add cross transformation to OverwriteObjects manual test

Repository: flink
Updated Branches:
  refs/heads/master e7d78e63a -> 6c061684f


[hotfix] [tests] Add cross transformation to OverwriteObjects manual test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c061684
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c061684
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c061684

Branch: refs/heads/master
Commit: 6c061684f93ddcd375f37d3ef71f6dd66783df8b
Parents: e7d78e6
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Apr 13 10:48:52 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Apr 13 10:51:06 2016 -0400

----------------------------------------------------------------------
 .../flink/test/manual/OverwriteObjects.java     | 67 ++++++++++++++++++--
 1 file changed, 63 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6c061684/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java b/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
index 1a70c69..23d8e67 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.manual;
 
+import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -73,6 +74,7 @@ public class OverwriteObjects {
 			testReduce(env);
 			testGroupedReduce(env);
 			testJoin(env);
+			testCross(env);
 		}
 	}
 
@@ -127,7 +129,7 @@ public class OverwriteObjects {
 		Assert.assertEquals(disabledChecksum, enabledChecksum);
 	}
 
-	public class OverwriteObjectsReduce implements ReduceFunction<Tuple2<IntValue, IntValue>> {
+	private class OverwriteObjectsReduce implements ReduceFunction<Tuple2<IntValue, IntValue>> {
 		private Scrambler scrambler;
 
 		public OverwriteObjectsReduce(boolean keyed) {
@@ -252,7 +254,7 @@ public class OverwriteObjects {
 		}
 	}
 
-	public class OverwriteObjectsJoin implements JoinFunction<Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>> {
+	private class OverwriteObjectsJoin implements JoinFunction<Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>> {
 		private Scrambler scrambler = new Scrambler(true);
 
 		@Override
@@ -263,12 +265,69 @@ public class OverwriteObjects {
 
 	// --------------------------------------------------------------------------------------------
 
-	private DataSet<Tuple2<IntValue, IntValue>> getDataSet(ExecutionEnvironment env) {
+	public void testCross(ExecutionEnvironment env) throws Exception {
+		/*
+		 * Test CrossDriver
+		 */
+
+		LOG.info("Testing cross");
+
+		DataSet<Tuple2<IntValue, IntValue>> small = getDataSet(env, 100, 20);
+		DataSet<Tuple2<IntValue, IntValue>> large = getDataSet(env, 10000, 2000);
+
+		// test NESTEDLOOP_BLOCKED_OUTER_FIRST and NESTEDLOOP_BLOCKED_OUTER_SECOND with object reuse enabled
+
+		env.getConfig().enableObjectReuse();
+
+		ChecksumHashCode enabledChecksumWithHuge = DataSetUtils.checksumHashCode(small
+			.crossWithHuge(large)
+			.with(new OverwriteObjectsCross()));
+
+		ChecksumHashCode enabledChecksumWithTiny = DataSetUtils.checksumHashCode(small
+			.crossWithTiny(large)
+			.with(new OverwriteObjectsCross()));
+
+		Assert.assertEquals(enabledChecksumWithHuge, enabledChecksumWithTiny);
+
+		// test NESTEDLOOP_BLOCKED_OUTER_FIRST and NESTEDLOOP_BLOCKED_OUTER_SECOND with object reuse disabled
+
+		env.getConfig().disableObjectReuse();
+
+		ChecksumHashCode disabledChecksumWithHuge = DataSetUtils.checksumHashCode(small
+			.crossWithHuge(large)
+			.with(new OverwriteObjectsCross()));
+
+		ChecksumHashCode disabledChecksumWithTiny = DataSetUtils.checksumHashCode(small
+			.crossWithTiny(large)
+			.with(new OverwriteObjectsCross()));
+
+		Assert.assertEquals(disabledChecksumWithHuge, disabledChecksumWithTiny);
+
+		// verify that checksums match between object reuse enabled and disabled
+		Assert.assertEquals(enabledChecksumWithHuge, disabledChecksumWithHuge);
+	}
+
+	private class OverwriteObjectsCross implements CrossFunction<Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>> {
+		private Scrambler scrambler = new Scrambler(true);
+
+		@Override
+		public Tuple2<IntValue, IntValue> cross(Tuple2<IntValue, IntValue> a, Tuple2<IntValue, IntValue> b) throws Exception {
+			return scrambler.scramble(a, b);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private DataSet<Tuple2<IntValue, IntValue>> getDataSet(ExecutionEnvironment env, int numberOfElements, int keyRange) {
 		return env
-			.fromCollection(new TupleIntValueIntValueIterator(NUMBER_OF_ELEMENTS, KEY_RANGE),
+			.fromCollection(new TupleIntValueIntValueIterator(numberOfElements, keyRange),
 				TupleTypeInfo.<Tuple2<IntValue, IntValue>>getBasicAndBasicValueTupleTypeInfo(IntValue.class, IntValue.class));
 	}
 
+	private DataSet<Tuple2<IntValue, IntValue>> getDataSet(ExecutionEnvironment env) {
+		return getDataSet(env, NUMBER_OF_ELEMENTS, KEY_RANGE);
+	}
+
 	private DataSet<Tuple2<IntValue, IntValue>> getFilteredDataSet(ExecutionEnvironment env) {
 		return getDataSet(env)
 			.filter(new FilterFunction<Tuple2<IntValue, IntValue>>() {