You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/12 23:44:10 UTC
[10/22] flink git commit: [FLINK-6731] [tests] Activate strict
checkstyle for flink-tests
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java b/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
index 6215f31..638eb5c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.types.IntValue;
+
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,13 +42,13 @@ import java.util.Random;
import static org.hamcrest.Matchers.is;
-/*
+/**
* These programs demonstrate the effects of user defined functions which modify input objects or return locally created
* objects that are retained and reused on future calls. The programs do not retain and later modify input objects.
*/
public class OverwriteObjects {
- public final static Logger LOG = LoggerFactory.getLogger(OverwriteObjects.class);
+ public static final Logger LOG = LoggerFactory.getLogger(OverwriteObjects.class);
// DataSets are created with this number of elements
private static final int NUMBER_OF_ELEMENTS = 3_000_000;
@@ -71,7 +72,7 @@ public class OverwriteObjects {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
- for (int parallelism = MAX_PARALLELISM ; parallelism > 0 ; parallelism--) {
+ for (int parallelism = MAX_PARALLELISM; parallelism > 0; parallelism--) {
LOG.info("Parallelism = {}", parallelism);
env.setParallelism(parallelism);
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java b/flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java
index c8604cb..46be968 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/ReducePerformance.java
@@ -36,7 +36,7 @@ import java.util.Random;
* (See also http://peel-framework.org/2016/04/07/hash-aggregations-in-flink.html)
*/
public class ReducePerformance {
-
+
public static void main(String[] args) throws Exception {
final int numElements = 40_000_000;
@@ -120,7 +120,7 @@ public class ReducePerformance {
int rem = numElements % numPartitions;
SplittableRandomIterator<T, B>[] res = new SplittableRandomIterator[numPartitions];
for (int i = 0; i < numPartitions; i++) {
- res[i] = new SplittableRandomIterator<T, B>(i < rem ? splitSize : splitSize + 1, (B)baseIterator.copy());
+ res[i] = new SplittableRandomIterator<T, B>(i < rem ? splitSize : splitSize + 1, (B) baseIterator.copy());
}
return res;
}
@@ -140,7 +140,6 @@ public class ReducePerformance {
CopyableIterator<T> copy();
}
-
private static final class TupleIntIntIterator implements CopyableIterator<Tuple2<Integer, Integer>>, Serializable {
private final int keyRange;
@@ -183,7 +182,6 @@ public class ReducePerformance {
}
}
-
private static final class TupleStringIntIterator implements CopyableIterator<Tuple2<String, Integer>>, Serializable {
private final int keyRange;
@@ -226,7 +224,6 @@ public class ReducePerformance {
}
}
-
private static final class SumReducer<K> implements ReduceFunction<Tuple2<K, Integer>> {
@Override
public Tuple2<K, Integer> reduce(Tuple2<K, Integer> a, Tuple2<K, Integer> b) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
index 90dbe80..c7f43fa 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
@@ -31,24 +31,27 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import static org.junit.Assert.fail;
+/**
+ * Manual test to evaluate impact of checkpointing on latency.
+ */
public class StreamingScalabilityAndLatency {
-
+
public static void main(String[] args) throws Exception {
if ((Runtime.getRuntime().maxMemory() >>> 20) < 5000) {
throw new RuntimeException("This test program needs to run with at least 5GB of heap space.");
}
-
- final int TASK_MANAGERS = 1;
- final int SLOTS_PER_TASK_MANAGER = 80;
- final int PARALLELISM = TASK_MANAGERS * SLOTS_PER_TASK_MANAGER;
+
+ final int taskManagers = 1;
+ final int slotsPerTaskManager = 80;
+ final int parallelism = taskManagers * slotsPerTaskManager;
LocalFlinkMiniCluster cluster = null;
try {
Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, TASK_MANAGERS);
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, taskManagers);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TASK_MANAGER);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slotsPerTaskManager);
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 20000);
config.setInteger("taskmanager.net.server.numThreads", 1);
@@ -56,8 +59,8 @@ public class StreamingScalabilityAndLatency {
cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
-
- runPartitioningProgram(cluster.getLeaderRPCPort(), PARALLELISM);
+
+ runPartitioningProgram(cluster.getLeaderRPCPort(), parallelism);
}
catch (Exception e) {
e.printStackTrace();
@@ -69,7 +72,7 @@ public class StreamingScalabilityAndLatency {
}
}
}
-
+
private static void runPartitioningProgram(int jobManagerPort, int parallelism) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort);
env.setParallelism(parallelism);
@@ -83,23 +86,22 @@ public class StreamingScalabilityAndLatency {
.map(new IdMapper<Tuple2<Long, Long>>())
.keyBy(0)
.addSink(new TimestampingSink());
-
+
env.execute("Partitioning Program");
}
-
- public static class TimeStampingSource implements ParallelSourceFunction<Tuple2<Long, Long>> {
+
+ private static class TimeStampingSource implements ParallelSourceFunction<Tuple2<Long, Long>> {
private static final long serialVersionUID = -151782334777482511L;
private volatile boolean running = true;
-
-
+
@Override
public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
-
+
long num = 100;
long counter = (long) (Math.random() * 4096);
-
+
while (running) {
if (num < 100) {
num++;
@@ -119,14 +121,14 @@ public class StreamingScalabilityAndLatency {
running = false;
}
}
-
- public static class TimestampingSink implements SinkFunction<Tuple2<Long, Long>> {
+
+ private static class TimestampingSink implements SinkFunction<Tuple2<Long, Long>> {
private static final long serialVersionUID = 1876986644706201196L;
private long maxLatency;
- private long count;
-
+ private long count;
+
@Override
public void invoke(Tuple2<Long, Long> value) {
long ts = value.f1;
@@ -134,7 +136,7 @@ public class StreamingScalabilityAndLatency {
long diff = System.currentTimeMillis() - ts;
maxLatency = Math.max(diff, maxLatency);
}
-
+
count++;
if (count == 5000) {
System.out.println("Max latency: " + maxLatency);
@@ -144,7 +146,7 @@ public class StreamingScalabilityAndLatency {
}
}
- public static class IdMapper<T> implements MapFunction<T, T> {
+ private static class IdMapper<T> implements MapFunction<T, T> {
private static final long serialVersionUID = -6543809409233225099L;
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java b/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java
index 1c5744d..bd5123a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/package-info.java
@@ -21,4 +21,5 @@
* need to be manually invoked, because they are extremely heavy, time intensive,
* of require larger-than-usual JVMs.
*/
-package org.apache.flink.test.manual;
\ No newline at end of file
+
+package org.apache.flink.test.manual;
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index eea2509..1fb5e65 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -18,13 +18,10 @@
package org.apache.flink.test.misc;
-import static org.junit.Assert.*;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -33,6 +30,7 @@ import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.test.util.TestEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -41,6 +39,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
/**
* This test verifies that the auto parallelism is properly forwarded to the runtime.
*/
@@ -79,7 +80,6 @@ public class AutoParallelismITCase extends TestLogger {
}
}
-
@Test
public void testProgramWithAutoParallelism() {
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
index 39a08d2..b8f1d80 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
@@ -24,8 +24,12 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.test.util.JavaProgramTestBase;
+
import org.junit.Assert;
+/**
+ * Integration tests for custom {@link Partitioner}.
+ */
@SuppressWarnings("serial")
public class CustomPartitioningITCase extends JavaProgramTestBase {
@@ -36,17 +40,17 @@ public class CustomPartitioningITCase extends JavaProgramTestBase {
if (!isCollectionExecution()) {
Assert.assertTrue(env.getParallelism() > 1);
}
-
+
env.generateSequence(1, 1000)
.partitionCustom(new AllZeroPartitioner(), new IdKeySelector<Long>())
.map(new FailExceptInPartitionZeroMapper())
.output(new DiscardingOutputFormat<Long>());
-
+
env.execute();
}
-
- public static class FailExceptInPartitionZeroMapper extends RichMapFunction<Long, Long> {
-
+
+ private static class FailExceptInPartitionZeroMapper extends RichMapFunction<Long, Long> {
+
@Override
public Long map(Long value) throws Exception {
if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
@@ -56,15 +60,15 @@ public class CustomPartitioningITCase extends JavaProgramTestBase {
}
}
}
-
- public static class AllZeroPartitioner implements Partitioner<Long> {
+
+ private static class AllZeroPartitioner implements Partitioner<Long> {
@Override
public int partition(Long key, int numPartitions) {
return 0;
}
}
-
- public static class IdKeySelector<T> implements KeySelector<T, T> {
+
+ private static class IdKeySelector<T> implements KeySelector<T, T> {
@Override
public T getKey(T value) {
return value;
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
index 76480ba..1532741 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
@@ -30,8 +30,8 @@ import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.test.util.TestEnvironment;
import org.apache.flink.types.Value;
-
import org.apache.flink.util.TestLogger;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -41,11 +41,15 @@ import java.io.IOException;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+/**
+ * Test for proper error messages in case user-defined serialization is broken
+ * and detected in the network stack.
+ */
@SuppressWarnings("serial")
public class CustomSerializationITCase extends TestLogger {
private static final int PARLLELISM = 5;
-
+
private static LocalFlinkMiniCluster cluster;
private static TestEnvironment env;
@@ -66,13 +70,13 @@ public class CustomSerializationITCase extends TestLogger {
cluster.shutdown();
cluster = null;
}
-
+
@Test
public void testIncorrectSerializer1() {
try {
env.setParallelism(PARLLELISM);
env.getConfig().disableSysoutLogging();
-
+
env
.generateSequence(1, 10 * PARLLELISM)
.map(new MapFunction<Long, ConsumesTooMuch>() {
@@ -83,7 +87,7 @@ public class CustomSerializationITCase extends TestLogger {
})
.rebalance()
.output(new DiscardingOutputFormat<ConsumesTooMuch>());
-
+
env.execute();
}
catch (JobExecutionException e) {
@@ -186,11 +190,14 @@ public class CustomSerializationITCase extends TestLogger {
fail(e.getMessage());
}
}
-
+
// ------------------------------------------------------------------------
// Custom Data Types with broken Serialization Logic
// ------------------------------------------------------------------------
-
+
+ /**
+ * {@link Value} reading more data than written.
+ */
public static class ConsumesTooMuch implements Value {
@Override
@@ -206,6 +213,9 @@ public class CustomSerializationITCase extends TestLogger {
}
}
+ /**
+ * {@link Value} reading more buffers than written.
+ */
public static class ConsumesTooMuchSpanning implements Value {
@Override
@@ -221,6 +231,9 @@ public class CustomSerializationITCase extends TestLogger {
}
}
+ /**
+ * {@link Value} reading less data than written.
+ */
public static class ConsumesTooLittle implements Value {
@Override
@@ -236,6 +249,9 @@ public class CustomSerializationITCase extends TestLogger {
}
}
+ /**
+ * {@link Value} reading fewer buffers than written.
+ */
public static class ConsumesTooLittleSpanning implements Value {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java b/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java
index fa1fcb6..c004759 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java
@@ -21,20 +21,23 @@ package org.apache.flink.test.misc;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets;
import org.junit.Assert;
import org.junit.Test;
+/**
+ * Test TypeInfo serializer tree.
+ */
public class GenericTypeInfoTest {
@Test
public void testSerializerTree() {
@SuppressWarnings("unchecked")
- TypeInformation<CollectionDataSets.PojoWithCollectionGeneric> ti =
- (TypeInformation<CollectionDataSets.PojoWithCollectionGeneric>)
+ TypeInformation<CollectionDataSets.PojoWithCollectionGeneric> ti =
+ (TypeInformation<CollectionDataSets.PojoWithCollectionGeneric>)
TypeExtractor.createTypeInfo(CollectionDataSets.PojoWithCollectionGeneric.class);
-
+
String serTree = Utils.getSerializerTree(ti);
// We can not test against the entire output because the fields of 'String' differ
// between java versions
@@ -67,7 +70,7 @@ public class GenericTypeInfoTest {
" lowestSetBit:int\n" +
" firstNonzeroIntNum:int\n" +
" mixed:java.util.List\n" +
- " makeMeGeneric:org.apache.flink.test.javaApiOperators.util.CollectionDataSets$PojoWithDateAndEnum\n" +
+ " makeMeGeneric:org.apache.flink.test.operators.util.CollectionDataSets$PojoWithDateAndEnum\n" +
" group:java.lang.String\n"));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
index 7dab0f1..00b4485 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
@@ -30,24 +30,26 @@ import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-
import org.apache.flink.test.util.TestEnvironment;
import org.apache.flink.util.Collector;
-
import org.apache.flink.util.TestLogger;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Tests for the system behavior in multiple corner cases
* - when null records are passed through the system.
* - when disjoint dataflows are executed
* - when accumulators are used chained after a non-udf operator.
- *
- * The tests are bundled into one class to reuse the same test cluster. This speeds
+ *
+ * <p>The tests are bundled into one class to reuse the same test cluster. This speeds
* up test execution, as the majority of the test time goes usually into starting/stopping the
* test cluster.
*/
@@ -59,7 +61,7 @@ public class MiscellaneousIssuesITCase extends TestLogger {
private static LocalFlinkMiniCluster cluster;
private static TestEnvironment env;
-
+
@BeforeClass
public static void startCluster() {
Configuration config = new Configuration();
@@ -72,13 +74,13 @@ public class MiscellaneousIssuesITCase extends TestLogger {
env = new TestEnvironment(cluster, PARALLELISM, false);
}
-
+
@AfterClass
public static void shutdownCluster() {
cluster.shutdown();
cluster = null;
}
-
+
@Test
public void testNullValues() {
try {
@@ -128,13 +130,13 @@ public class MiscellaneousIssuesITCase extends TestLogger {
@Test
public void testAccumulatorsAfterNoOp() {
-
- final String ACC_NAME = "test_accumulator";
-
+
+ final String accName = "test_accumulator";
+
try {
env.setParallelism(6);
env.getConfig().disableSysoutLogging();
-
+
env.generateSequence(1, 1000000)
.rebalance()
.flatMap(new RichFlatMapFunction<Long, Long>() {
@@ -143,7 +145,7 @@ public class MiscellaneousIssuesITCase extends TestLogger {
@Override
public void open(Configuration parameters) {
- counter = getRuntimeContext().getLongCounter(ACC_NAME);
+ counter = getRuntimeContext().getLongCounter(accName);
}
@Override
@@ -154,8 +156,8 @@ public class MiscellaneousIssuesITCase extends TestLogger {
.output(new DiscardingOutputFormat<Long>());
JobExecutionResult result = env.execute();
-
- assertEquals(1000000L, result.getAllAccumulatorResults().get(ACC_NAME));
+
+ assertEquals(1000000L, result.getAllAccumulatorResults().get(accName));
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index a5103cc..fd556d5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -32,35 +32,41 @@ import org.apache.flink.examples.java.clustering.KMeans;
import org.apache.flink.examples.java.clustering.util.KMeansData;
import org.apache.flink.examples.java.graph.ConnectedComponents;
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
-
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.test.util.TestEnvironment;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+/**
+ * Test that runs an iterative job after a failure in another iterative job.
+ * This test validates that task slots in co-location constraints are properly
+ * freed in the presence of failures.
+ */
public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
private static final int PARALLELISM = 16;
@Test
public void testSuccessfulProgramAfterFailure() {
LocalFlinkMiniCluster cluster = null;
-
+
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
-
+
cluster = new LocalFlinkMiniCluster(config, false);
cluster.start();
TestEnvironment env = new TestEnvironment(cluster, PARALLELISM, false);
-
+
try {
runConnectedComponents(env);
}
@@ -68,7 +74,7 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
e.printStackTrace();
fail("Program Execution should have succeeded.");
}
-
+
try {
runKMeans(env);
fail("This program execution should have failed.");
@@ -76,7 +82,7 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
catch (JobExecutionException e) {
assertTrue(e.getCause().getMessage().contains("Insufficient number of network buffers"));
}
-
+
try {
runConnectedComponents(env);
}
@@ -95,9 +101,9 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
}
}
}
-
+
private static void runConnectedComponents(ExecutionEnvironment env) throws Exception {
-
+
env.setParallelism(PARALLELISM);
env.getConfig().disableSysoutLogging();
@@ -166,7 +172,7 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
.map(new KMeans.SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
clusteredPoints.output(new DiscardingOutputFormat<Tuple2<Integer, KMeans.Point>>());
-
+
env.execute("KMeans Example");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/AggregateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/AggregateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/AggregateITCase.java
new file mode 100644
index 0000000..b4bd213
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/AggregateITCase.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.ValueCollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+/**
+ * Integration tests for aggregations.
+ */
+@RunWith(Parameterized.class)
+public class AggregateITCase extends MultipleProgramsTestBase {
+
+ public AggregateITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Test
+ public void testFullAggregate() throws Exception {
+ /*
+ * Full Aggregate
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple2<Integer, Long>> aggregateDs = ds
+ .aggregate(Aggregations.SUM, 0)
+ .and(Aggregations.MAX, 1)
+ .project(0, 1);
+
+ List<Tuple2<Integer, Long>> result = aggregateDs.collect();
+
+ String expected = "231,6\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testFullAggregateOfMutableValueTypes() throws Exception {
+ /*
+ * Full Aggregate of mutable value types
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple2<IntValue, LongValue>> aggregateDs = ds
+ .aggregate(Aggregations.SUM, 0)
+ .and(Aggregations.MAX, 1)
+ .project(0, 1);
+
+ List<Tuple2<IntValue, LongValue>> result = aggregateDs.collect();
+
+ String expected = "231,6\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testGroupedAggregate() throws Exception {
+ /*
+ * Grouped Aggregate
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
+ .aggregate(Aggregations.SUM, 0)
+ .project(1, 0);
+
+ List<Tuple2<Long, Integer>> result = aggregateDs.collect();
+
+ String expected = "1,1\n" +
+ "2,5\n" +
+ "3,15\n" +
+ "4,34\n" +
+ "5,65\n" +
+ "6,111\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testGroupedAggregateOfMutableValueTypes() throws Exception {
+ /*
+ * Grouped Aggregate of mutable value types
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple2<IntValue, LongValue>> aggregateDs = ds.groupBy(1)
+ .aggregate(Aggregations.SUM, 0)
+ .project(1, 0);
+
+ List<Tuple2<IntValue, LongValue>> result = aggregateDs.collect();
+
+ String expected = "1,1\n" +
+ "2,5\n" +
+ "3,15\n" +
+ "4,34\n" +
+ "5,65\n" +
+ "6,111\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testNestedAggregate() throws Exception {
+ /*
+ * Nested Aggregate
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1)
+ .aggregate(Aggregations.MIN, 0)
+ .aggregate(Aggregations.MIN, 0)
+ .project(0);
+
+ List<Tuple1<Integer>> result = aggregateDs.collect();
+
+ String expected = "1\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testNestedAggregateOfMutableValueTypes() throws Exception {
+ /*
+ * Nested Aggregate of mutable value types
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple1<IntValue>> aggregateDs = ds.groupBy(1)
+ .aggregate(Aggregations.MIN, 0)
+ .aggregate(Aggregations.MIN, 0)
+ .project(0);
+
+ List<Tuple1<IntValue>> result = aggregateDs.collect();
+
+ String expected = "1\n";
+
+ compareResultAsTuples(result, expected);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java
new file mode 100644
index 0000000..4108b24
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+
+/**
+ * Integration tests for {@link CoGroupFunction}.
+ */
+@SuppressWarnings({"serial", "unchecked"})
+public class CoGroupGroupSortITCase extends JavaProgramTestBase {
+
+ @Override
+ protected void testProgram() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> input1 = env.fromElements(
+ new Tuple2<Long, Long>(0L, 5L),
+ new Tuple2<Long, Long>(0L, 4L),
+ new Tuple2<Long, Long>(0L, 3L),
+ new Tuple2<Long, Long>(0L, 2L),
+ new Tuple2<Long, Long>(0L, 1L),
+ new Tuple2<Long, Long>(1L, 10L),
+ new Tuple2<Long, Long>(1L, 8L),
+ new Tuple2<Long, Long>(1L, 9L),
+ new Tuple2<Long, Long>(1L, 7L));
+
+ DataSet<TestPojo> input2 = env.fromElements(
+ new TestPojo(0L, 10L, 3L),
+ new TestPojo(0L, 8L, 3L),
+ new TestPojo(0L, 10L, 1L),
+ new TestPojo(0L, 9L, 0L),
+ new TestPojo(0L, 8L, 2L),
+ new TestPojo(0L, 8L, 4L),
+ new TestPojo(1L, 10L, 3L),
+ new TestPojo(1L, 8L, 3L),
+ new TestPojo(1L, 10L, 1L),
+ new TestPojo(1L, 9L, 0L),
+ new TestPojo(1L, 8L, 2L),
+ new TestPojo(1L, 8L, 4L));
+
+ input1.coGroup(input2)
+ .where(1).equalTo("b")
+ .sortFirstGroup(0, Order.DESCENDING)
+ .sortSecondGroup("c", Order.ASCENDING).sortSecondGroup("a", Order.DESCENDING)
+
+ .with(new ValidatingCoGroup())
+ .output(new DiscardingOutputFormat<NullValue>());
+
+ env.execute();
+ }
+
+ private static class ValidatingCoGroup implements CoGroupFunction<Tuple2<Long, Long>, TestPojo, NullValue> {
+
+ @Override
+ public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<TestPojo> second, Collector<NullValue> out) throws Exception {
+ // validate the tuple input, field 1, descending
+ {
+ long lastValue = Long.MAX_VALUE;
+
+ for (Tuple2<Long, Long> t : first) {
+ long current = t.f1;
+ Assert.assertTrue(current <= lastValue);
+ lastValue = current;
+ }
+ }
+
+ // validate the pojo input
+ {
+ TestPojo lastValue = new TestPojo(Long.MAX_VALUE, 0, Long.MIN_VALUE);
+
+ for (TestPojo current : second) {
+ Assert.assertTrue(current.c >= lastValue.c);
+ Assert.assertTrue(current.c != lastValue.c || current.a <= lastValue.a);
+
+ lastValue = current;
+ }
+ }
+
+ }
+ }
+
+ /**
+ * Test POJO.
+ */
+ public static class TestPojo implements Cloneable {
+ public long a;
+ public long b;
+ public long c;
+
+ public TestPojo() {}
+
+ public TestPojo(long a, long b, long c) {
+ this.a = a;
+ this.b = b;
+ this.c = c;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
new file mode 100644
index 0000000..453f525
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
@@ -0,0 +1,989 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets.CustomType;
+import org.apache.flink.test.operators.util.CollectionDataSets.POJO;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Integration tests for {@link CoGroupFunction} and {@link RichCoGroupFunction}.
+ */
+@RunWith(Parameterized.class)
+public class CoGroupITCase extends MultipleProgramsTestBase {
+
+ public CoGroupITCase(TestExecutionMode mode){
+ super(mode);
+ }
+
+ /*
+ * CoGroup on tuples with key field selector
+ */
+ @Test
+ public void testCoGroupTuplesWithKeyFieldSelector() throws Exception {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple2<Integer, Integer>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5CoGroup());
+
+ List<Tuple2<Integer, Integer>> result = coGroupDs.collect();
+
+ String expected = "1,0\n" +
+ "2,6\n" +
+ "3,24\n" +
+ "4,60\n" +
+ "5,120\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testCoGroupOnTwoCustomTypeInputsWithKeyExtractors() throws Exception {
+ /*
+ * CoGroup on two custom type inputs with key extractors
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where(new KeySelector4()).equalTo(new
+ KeySelector5()).with(new CustomTypeCoGroup());
+
+ List<CustomType> result = coGroupDs.collect();
+
+ String expected = "1,0,test\n" +
+ "2,6,test\n" +
+ "3,24,test\n" +
+ "4,60,test\n" +
+ "5,120,test\n" +
+ "6,210,test\n";
+
+ compareResultAsText(result, expected);
+ }
+
+ private static class KeySelector4 implements KeySelector<CustomType, Integer> {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public Integer getKey(CustomType in) {
+ return in.myInt;
+ }
+ }
+
+ private static class KeySelector5 implements KeySelector<CustomType, Integer> {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public Integer getKey(CustomType in) {
+ return in.myInt;
+ }
+ }
+
+ @Test
+ public void testCorrectnessOfCoGroupIfUDFReturnsLeftInputObjects() throws Exception {
+ /*
+ * check correctness of cogroup if UDF returns left input objects
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple3ReturnLeft());
+
+ List<Tuple3<Integer, Long, String>> result = coGroupDs.collect();
+
+ String expected = "1,1,Hi\n" +
+ "2,2,Hello\n" +
+ "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" +
+ "5,3,I am fine.\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testCorrectnessOfCoGroupIfUDFReturnsRightInputObjects() throws Exception {
+ /*
+ * check correctness of cogroup if UDF returns right input objects
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5ReturnRight());
+
+ List<Tuple5<Integer, Long, Integer, String, Long>> result = coGroupDs.collect();
+
+ String expected = "1,1,0,Hallo,1\n" +
+ "2,2,1,Hallo Welt,2\n" +
+ "2,3,2,Hallo Welt wie,1\n" +
+ "3,4,3,Hallo Welt wie gehts?,2\n" +
+ "3,5,4,ABC,2\n" +
+ "3,6,5,BCD,3\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testCoGroupWithBroadcastSet() throws Exception {
+ /*
+ * Reduce with broadcast set
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple3<Integer, Integer, Integer>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5CoGroupBC()).withBroadcastSet(intDs, "ints");
+
+ List<Tuple3<Integer, Integer, Integer>> result = coGroupDs.collect();
+
+ String expected = "1,0,55\n" +
+ "2,6,55\n" +
+ "3,24,55\n" +
+ "4,60,55\n" +
+ "5,120,55\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testCoGroupOnATupleInputWithKeyFieldSelectorAndACustomTypeInputWithKeyExtractor()
+ throws Exception {
+ /*
+ * CoGroup on a tuple input with key field selector and a custom type input with key extractor
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds.coGroup(ds2).where(2).equalTo(new
+ KeySelector2()).with(new MixedCoGroup());
+
+ List<Tuple3<Integer, Long, String>> result = coGroupDs.collect();
+
+ String expected = "0,1,test\n" +
+ "1,2,test\n" +
+ "2,5,test\n" +
+ "3,15,test\n" +
+ "4,33,test\n" +
+ "5,63,test\n" +
+ "6,109,test\n" +
+ "7,4,test\n" +
+ "8,4,test\n" +
+ "9,4,test\n" +
+ "10,5,test\n" +
+ "11,5,test\n" +
+ "12,5,test\n" +
+ "13,5,test\n" +
+ "14,5,test\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ private static class KeySelector2 implements KeySelector<CustomType, Integer> {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public Integer getKey(CustomType in) {
+ return in.myInt;
+ }
+ }
+
+ @Test
+ public void testCoGroupOnACustomTypeWithKeyExtractorAndATupleInputWithKeyFieldSelector()
+ throws Exception {
+ /*
+ * CoGroup on a tuple input with key field selector and a custom type input with key extractor
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<CustomType> coGroupDs = ds2.coGroup(ds).where(new KeySelector3()).equalTo(2).with
+ (new MixedCoGroup2());
+
+ List<CustomType> result = coGroupDs.collect();
+
+ String expected = "0,1,test\n" +
+ "1,2,test\n" +
+ "2,5,test\n" +
+ "3,15,test\n" +
+ "4,33,test\n" +
+ "5,63,test\n" +
+ "6,109,test\n" +
+ "7,4,test\n" +
+ "8,4,test\n" +
+ "9,4,test\n" +
+ "10,5,test\n" +
+ "11,5,test\n" +
+ "12,5,test\n" +
+ "13,5,test\n" +
+ "14,5,test\n";
+
+ compareResultAsText(result, expected);
+ }
+
+ private static class KeySelector3 implements KeySelector<CustomType, Integer> {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public Integer getKey(CustomType in) {
+ return in.myInt;
+ }
+ }
+
+ @Test
+ public void testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception {
+ /*
+ * CoGroup with multiple key fields
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+
+ DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
+ where(0, 4).equalTo(0, 1).with(new Tuple5Tuple3CoGroup());
+
+ List<Tuple3<Integer, Long, String>> result = coGrouped.collect();
+
+ String expected = "1,1,Hallo\n" +
+ "2,2,Hallo Welt\n" +
+ "3,2,Hallo Welt wie gehts?\n" +
+ "3,2,ABC\n" +
+ "5,3,HIJ\n" +
+ "5,3,IJK\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testCoGroupWithMultipleKeyFieldsWithStaticClassKeyExtractor() throws Exception {
+ /*
+ * CoGroup with multiple key fields
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+
+ DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
+ where(new KeySelector7()).
+ equalTo(new KeySelector8()).with(new Tuple5Tuple3CoGroup());
+
+ List<Tuple3<Integer, Long, String>> result = coGrouped.collect();
+
+ String expected = "1,1,Hallo\n" +
+ "2,2,Hallo Welt\n" +
+ "3,2,Hallo Welt wie gehts?\n" +
+ "3,2,ABC\n" +
+ "5,3,HIJ\n" +
+ "5,3,IJK\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithClosureCleaner() throws Exception {
+ /*
+ * CoGroup with multiple key fields, test working closure cleaner for inner classes
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+
+ DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
+ where(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>,
+ Tuple2<Integer, Long>>() {
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) throws Exception {
+ return new Tuple2<Integer, Long>(t.f0, t.f4);
+ }
+ }).
+ equalTo(new KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>>() {
+
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> t) {
+ return new Tuple2<>(t.f0, t.f1);
+ }
+ }).
+ with(new CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
+ @Override
+ public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+ Iterable<Tuple3<Integer, Long, String>> second,
+ Collector<Tuple3<Integer, Long, String>> out) {
+ List<String> strs = new ArrayList<>();
+
+ for (Tuple5<Integer, Long, Integer, String, Long> t : first) {
+ strs.add(t.f3);
+ }
+
+ for (Tuple3<Integer, Long, String> t : second) {
+ for (String s : strs) {
+ out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s));
+ }
+ }
+ }
+ });
+
+ List<Tuple3<Integer, Long, String>> result = coGrouped.collect();
+
+ String expected = "1,1,Hallo\n" +
+ "2,2,Hallo Welt\n" +
+ "3,2,Hallo Welt wie gehts?\n" +
+ "3,2,ABC\n" +
+ "5,3,HIJ\n" +
+ "5,3,IJK\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithoutClosureCleaner() throws Exception {
+ /*
+ * CoGroup with multiple key fields, test that disabling closure cleaner leads to an exception when using inner
+ * classes.
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableClosureCleaner();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+ boolean correctExceptionTriggered = false;
+ try {
+ DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
+ where(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>,
+ Tuple2<Integer, Long>>() {
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) throws Exception {
+ return new Tuple2<Integer, Long>(t.f0, t.f4);
+ }
+ }).
+ equalTo(new KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>>() {
+
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f1);
+ }
+ }).
+ with(new CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
+ @Override
+ public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+ Iterable<Tuple3<Integer, Long, String>> second,
+ Collector<Tuple3<Integer, Long, String>> out) {
+ List<String> strs = new ArrayList<String>();
+
+ for (Tuple5<Integer, Long, Integer, String, Long> t : first) {
+ strs.add(t.f3);
+ }
+
+ for (Tuple3<Integer, Long, String> t : second) {
+ for (String s : strs) {
+ out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s));
+ }
+ }
+ }
+ });
+ } catch (InvalidProgramException ex) {
+ correctExceptionTriggered = (ex.getCause() instanceof java.io.NotSerializableException);
+ }
+ Assert.assertTrue(correctExceptionTriggered);
+
+ }
+
+ private static class KeySelector7 implements KeySelector<Tuple5<Integer, Long, Integer, String, Long>,
+ Tuple2<Integer, Long>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f4);
+ }
+ }
+
+ private static class KeySelector8 implements KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f1);
+ }
+ }
+
+ @Test
+ public void testCoGroupTwoCustomTypeInputsWithExpressionKeys() throws Exception {
+ /*
+ * CoGroup on two custom type inputs using expression keys
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where("myInt").equalTo("myInt").with(new CustomTypeCoGroup());
+
+ List<CustomType> result = coGroupDs.collect();
+
+ String expected = "1,0,test\n" +
+ "2,6,test\n" +
+ "3,24,test\n" +
+ "4,60,test\n" +
+ "5,120,test\n" +
+ "6,210,test\n";
+
+ compareResultAsText(result, expected);
+ }
+
+ @Test
+ public void testCoGroupOnTwoCustomTypeInputsWithExpressionKeyAndFieldSelector() throws
+ Exception {
+ /*
+ * CoGroup on two custom type inputs using expression keys
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
+ DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+ DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
+ .where("nestedPojo.longNumber").equalTo(6).with(new CoGroup1());
+
+ List<CustomType> result = coGroupDs.collect();
+
+ String expected = "-1,20000,Flink\n" +
+ "-1,10000,Flink\n" +
+ "-1,30000,Flink\n";
+
+ compareResultAsText(result, expected);
+ }
+
+ private static class CoGroup1 implements CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void coGroup(
+ Iterable<POJO> first,
+ Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
+ Collector<CustomType> out) throws Exception {
+ for (POJO p : first) {
+ for (Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
+ Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
+ out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testCoGroupFieldSelectorAndComplicatedKeySelector() throws Exception {
+ /*
+ * CoGroup field-selector (expression keys) + key selector function
+ * The key selector is unnecessary complicated (Tuple1) ;)
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
+ DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+ DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
+ .where(new KeySelector6()).equalTo(6).with(new CoGroup3());
+
+ List<CustomType> result = coGroupDs.collect();
+
+ String expected = "-1,20000,Flink\n" +
+ "-1,10000,Flink\n" +
+ "-1,30000,Flink\n";
+
+ compareResultAsText(result, expected);
+ }
+
+ private static class KeySelector6 implements KeySelector<POJO, Tuple1<Long>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple1<Long> getKey(POJO value)
+ throws Exception {
+ return new Tuple1<Long>(value.nestedPojo.longNumber);
+ }
+ }
+
+ private static class CoGroup3 implements CoGroupFunction<POJO, Tuple7<Integer,
+ String, Integer, Integer, Long, String, Long>, CustomType> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void coGroup(
+ Iterable<POJO> first,
+ Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
+ Collector<CustomType> out) throws Exception {
+ for (POJO p : first) {
+ for (Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
+ Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
+ out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testCoGroupFieldSelectorAndKeySelector() throws Exception {
+ /*
+ * CoGroup field-selector (expression keys) + key selector function
+ * The key selector is simple here
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
+ DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+ DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
+ .where(new KeySelector1()).equalTo(6).with(new CoGroup2());
+
+ List<CustomType> result = coGroupDs.collect();
+
+ String expected = "-1,20000,Flink\n" +
+ "-1,10000,Flink\n" +
+ "-1,30000,Flink\n";
+
+ compareResultAsText(result, expected);
+ }
+
+ @Test
+ public void testCoGroupWithAtomicType1() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Integer> ds2 = env.fromElements(0, 1, 2);
+
+ DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds1.coGroup(ds2).where(0).equalTo("*").with(new CoGroupAtomic1());
+
+ List<Tuple3<Integer, Long, String>> result = coGroupDs.collect();
+
+ String expected = "(1,1,Hi)\n" +
+ "(2,2,Hello)";
+
+ compareResultAsText(result, expected);
+ }
+
+ @Test
+ public void testCoGroupWithAtomicType2() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Integer> ds1 = env.fromElements(0, 1, 2);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+
+ DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds1.coGroup(ds2).where("*").equalTo(0).with(new CoGroupAtomic2());
+
+ List<Tuple3<Integer, Long, String>> result = coGroupDs.collect();
+
+ String expected = "(1,1,Hi)\n" +
+ "(2,2,Hello)";
+
+ compareResultAsText(result, expected);
+ }
+
+ @Test
+ public void testCoGroupWithRangePartitioning() throws Exception {
+ /*
+ * Test coGroup on tuples with multiple key field positions and same customized distribution
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+
+ env.setParallelism(4);
+ TestDistribution testDis = new TestDistribution();
+ DataSet<Tuple3<Integer, Long, String>> coGrouped =
+ DataSetUtils.partitionByRange(ds1, testDis, 0, 4)
+ .coGroup(DataSetUtils.partitionByRange(ds2, testDis, 0, 1))
+ .where(0, 4)
+ .equalTo(0, 1)
+ .with(new Tuple5Tuple3CoGroup());
+
+ List<Tuple3<Integer, Long, String>> result = coGrouped.collect();
+
+ String expected = "1,1,Hallo\n" +
+ "2,2,Hallo Welt\n" +
+ "3,2,Hallo Welt wie gehts?\n" +
+ "3,2,ABC\n" +
+ "5,3,HIJ\n" +
+ "5,3,IJK\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // UDF classes
+ // --------------------------------------------------------------------------------------------
+
+ private static class KeySelector1 implements KeySelector<POJO, Long> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Long getKey(POJO value)
+ throws Exception {
+ return value.nestedPojo.longNumber;
+ }
+ }
+
+ private static class CoGroup2 implements CoGroupFunction<POJO, Tuple7<Integer, String,
+ Integer, Integer, Long, String, Long>, CustomType> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void coGroup(
+ Iterable<POJO> first,
+ Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
+ Collector<CustomType> out) throws Exception {
+ for (POJO p : first) {
+ for (Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
+ Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
+ out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
+ }
+ }
+ }
+ }
+
+ private static class Tuple5CoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Integer>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void coGroup(
+ Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+ Iterable<Tuple5<Integer, Long, Integer, String, Long>> second,
+ Collector<Tuple2<Integer, Integer>> out) {
+ int sum = 0;
+ int id = 0;
+
+ for (Tuple5<Integer, Long, Integer, String, Long> element : first) {
+ sum += element.f2;
+ id = element.f0;
+ }
+
+ for (Tuple5<Integer, Long, Integer, String, Long> element : second) {
+ sum += element.f2;
+ id = element.f0;
+ }
+
+ out.collect(new Tuple2<Integer, Integer>(id, sum));
+ }
+ }
+
+ private static class CustomTypeCoGroup implements CoGroupFunction<CustomType, CustomType, CustomType> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void coGroup(Iterable<CustomType> first, Iterable<CustomType> second, Collector<CustomType> out) {
+
+ CustomType o = new CustomType(0, 0, "test");
+
+ for (CustomType element : first) {
+ o.myInt = element.myInt;
+ o.myLong += element.myLong;
+ }
+
+ for (CustomType element : second) {
+ o.myInt = element.myInt;
+ o.myLong += element.myLong;
+ }
+
+ out.collect(o);
+ }
+ }
+
+ private static class MixedCoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void coGroup(
+ Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+ Iterable<CustomType> second,
+ Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+
+ long sum = 0;
+ int id = 0;
+
+ for (Tuple5<Integer, Long, Integer, String, Long> element : first) {
+ sum += element.f0;
+ id = element.f2;
+ }
+
+ for (CustomType element : second) {
+ id = element.myInt;
+ sum += element.myLong;
+ }
+
+ out.collect(new Tuple3<Integer, Long, String>(id, sum, "test"));
+ }
+
+ }
+
+ private static class MixedCoGroup2 implements CoGroupFunction<CustomType, Tuple5<Integer, Long, Integer, String, Long>, CustomType> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void coGroup(Iterable<CustomType> first,
+ Iterable<Tuple5<Integer, Long, Integer, String, Long>> second,
+ Collector<CustomType> out) {
+ CustomType o = new CustomType(0, 0, "test");
+
+ for (CustomType element : first) {
+ o.myInt = element.myInt;
+ o.myLong += element.myLong;
+ }
+
+ for (Tuple5<Integer, Long, Integer, String, Long> element : second) {
+ o.myInt = element.f2;
+ o.myLong += element.f0;
+ }
+
+ out.collect(o);
+
+ }
+
+ }
+
+ private static class Tuple3ReturnLeft implements CoGroupFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void coGroup(Iterable<Tuple3<Integer, Long, String>> first,
+ Iterable<Tuple3<Integer, Long, String>> second,
+ Collector<Tuple3<Integer, Long, String>> out) {
+ for (Tuple3<Integer, Long, String> element : first) {
+ if (element.f0 < 6) {
+ out.collect(element);
+ }
+ }
+ }
+ }
+
+ private static class Tuple5ReturnRight implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void coGroup(
+ Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+ Iterable<Tuple5<Integer, Long, Integer, String, Long>> second,
+ Collector<Tuple5<Integer, Long, Integer, String, Long>> out) {
+ for (Tuple5<Integer, Long, Integer, String, Long> element : second) {
+ if (element.f0 < 4) {
+ out.collect(element);
+ }
+ }
+ }
+ }
+
+ private static class Tuple5CoGroupBC extends RichCoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private int broadcast = 42;
+
+ @Override
+ public void open(Configuration config) {
+
+ Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
+ int sum = 0;
+ for (Integer i : ints) {
+ sum += i;
+ }
+ broadcast = sum;
+
+ }
+
+ @Override
+ public void coGroup(
+ Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+ Iterable<Tuple5<Integer, Long, Integer, String, Long>> second,
+ Collector<Tuple3<Integer, Integer, Integer>> out) {
+ int sum = 0;
+ int id = 0;
+
+ for (Tuple5<Integer, Long, Integer, String, Long> element : first) {
+ sum += element.f2;
+ id = element.f0;
+ }
+
+ for (Tuple5<Integer, Long, Integer, String, Long> element : second) {
+ sum += element.f2;
+ id = element.f0;
+ }
+
+ out.collect(new Tuple3<Integer, Integer, Integer>(id, sum, broadcast));
+ }
+ }
+
+ private static class Tuple5Tuple3CoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+ Iterable<Tuple3<Integer, Long, String>> second,
+ Collector<Tuple3<Integer, Long, String>> out) {
+ List<String> strs = new ArrayList<String>();
+
+ for (Tuple5<Integer, Long, Integer, String, Long> t : first) {
+ strs.add(t.f3);
+ }
+
+ for (Tuple3<Integer, Long, String> t : second) {
+ for (String s : strs) {
+ out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s));
+ }
+ }
+ }
+ }
+
+ private static class CoGroupAtomic1 implements CoGroupFunction<Tuple3<Integer, Long, String>, Integer, Tuple3<Integer, Long, String>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void coGroup(Iterable<Tuple3<Integer, Long, String>> first, Iterable<Integer> second, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+ List<Integer> ints = new ArrayList<Integer>();
+
+ for (Integer i : second) {
+ ints.add(i);
+ }
+
+ for (Tuple3<Integer, Long, String> t : first) {
+ for (Integer i : ints) {
+ if (t.f0.equals(i)) {
+ out.collect(t);
+ }
+ }
+ }
+ }
+ }
+
+ private static class CoGroupAtomic2 implements CoGroupFunction<Integer, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void coGroup(Iterable<Integer> first, Iterable<Tuple3<Integer, Long, String>> second, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+ List<Integer> ints = new ArrayList<Integer>();
+
+ for (Integer i : first) {
+ ints.add(i);
+ }
+
+ for (Tuple3<Integer, Long, String> t : second) {
+ for (Integer i : ints) {
+ if (t.f0.equals(i)) {
+ out.collect(t);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Test {@link DataDistribution}.
+ */
+ public static class TestDistribution implements DataDistribution {
+ public Object[][] boundaries = new Object[][]{
+ new Object[]{2, 2L},
+ new Object[]{5, 4L},
+ new Object[]{10, 12L},
+ new Object[]{21, 6L}
+ };
+
+ public TestDistribution() {}
+
+ @Override
+ public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
+ return boundaries[bucketNum];
+ }
+
+ @Override
+ public int getNumberOfFields() {
+ return 2;
+ }
+
+ @Override
+ public TypeInformation[] getKeyTypes() {
+ return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO};
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof TestDistribution;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
new file mode 100644
index 0000000..6e61f60
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.functions.RichCrossFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets.CustomType;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Integration tests for {@link CrossFunction} and {@link RichCrossFunction}.
+ */
+@RunWith(Parameterized.class)
+public class CrossITCase extends MultipleProgramsTestBase {
+
+ public CrossITCase(TestExecutionMode mode){
+ super(mode);
+ }
+
+ @Test
+ public void testCorretnessOfCrossOnTwoTupleInputs() throws Exception {
+ /*
+ * check correctness of cross on two tuple inputs
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple2<Integer, String>> crossDs = ds.cross(ds2).with(new Tuple5Cross());
+
+ List<Tuple2<Integer, String>> result = crossDs.collect();
+
+ String expected = "0,HalloHallo\n" +
+ "1,HalloHallo Welt\n" +
+ "2,HalloHallo Welt wie\n" +
+ "1,Hallo WeltHallo\n" +
+ "2,Hallo WeltHallo Welt\n" +
+ "3,Hallo WeltHallo Welt wie\n" +
+ "2,Hallo Welt wieHallo\n" +
+ "3,Hallo Welt wieHallo Welt\n" +
+ "4,Hallo Welt wieHallo Welt wie\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testCorrectnessOfCrossIfUDFReturnsLeftInputObject() throws Exception {
+ /*
+ * check correctness of cross if UDF returns left input object
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> crossDs = ds.cross(ds2).with(new Tuple3ReturnLeft());
+
+ List<Tuple3<Integer, Long, String>> result = crossDs.collect();
+
+ String expected = "1,1,Hi\n" +
+ "1,1,Hi\n" +
+ "1,1,Hi\n" +
+ "2,2,Hello\n" +
+ "2,2,Hello\n" +
+ "2,2,Hello\n" +
+ "3,2,Hello world\n" +
+ "3,2,Hello world\n" +
+ "3,2,Hello world\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testCorrectnessOfCrossIfUDFReturnsRightInputObject() throws Exception {
+ /*
+ * check correctness of cross if UDF returns right input object
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> crossDs = ds.cross(ds2).with(new Tuple5ReturnRight());
+
+ List<Tuple5<Integer, Long, Integer, String, Long>> result = crossDs
+ .collect();
+
+ String expected = "1,1,0,Hallo,1\n" +
+ "1,1,0,Hallo,1\n" +
+ "1,1,0,Hallo,1\n" +
+ "2,2,1,Hallo Welt,2\n" +
+ "2,2,1,Hallo Welt,2\n" +
+ "2,2,1,Hallo Welt,2\n" +
+ "2,3,2,Hallo Welt wie,1\n" +
+ "2,3,2,Hallo Welt wie,1\n" +
+ "2,3,2,Hallo Welt wie,1\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testCorrectnessOfCrossWithBroadcastSet() throws Exception {
+ /*
+ * check correctness of cross with broadcast set
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple3<Integer, Integer, Integer>> crossDs = ds.cross(ds2).with(new Tuple5CrossBC()).withBroadcastSet(intDs, "ints");
+
+ List<Tuple3<Integer, Integer, Integer>> result = crossDs.collect();
+
+ String expected = "2,0,55\n" +
+ "3,0,55\n" +
+ "3,0,55\n" +
+ "3,0,55\n" +
+ "4,1,55\n" +
+ "4,2,55\n" +
+ "3,0,55\n" +
+ "4,2,55\n" +
+ "4,4,55\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testCorrectnessOfCrossWithHuge() throws Exception {
+ /*
+ * check correctness of crossWithHuge (only correctness of result -> should be the same as with normal cross)
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithHuge(ds2).with(new Tuple5Cross());
+
+ List<Tuple2<Integer, String>> result = crossDs.collect();
+
+ String expected = "0,HalloHallo\n" +
+ "1,HalloHallo Welt\n" +
+ "2,HalloHallo Welt wie\n" +
+ "1,Hallo WeltHallo\n" +
+ "2,Hallo WeltHallo Welt\n" +
+ "3,Hallo WeltHallo Welt wie\n" +
+ "2,Hallo Welt wieHallo\n" +
+ "3,Hallo Welt wieHallo Welt\n" +
+ "4,Hallo Welt wieHallo Welt wie\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testCorrectnessOfCrossWithTiny() throws Exception {
+ /*
+ * check correctness of crossWithTiny (only correctness of result -> should be the same as with normal cross)
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithTiny(ds2).with(new Tuple5Cross());
+
+ List<Tuple2<Integer, String>> result = crossDs.collect();
+
+ String expected = "0,HalloHallo\n" +
+ "1,HalloHallo Welt\n" +
+ "2,HalloHallo Welt wie\n" +
+ "1,Hallo WeltHallo\n" +
+ "2,Hallo WeltHallo Welt\n" +
+ "3,Hallo WeltHallo Welt wie\n" +
+ "2,Hallo Welt wieHallo\n" +
+ "3,Hallo Welt wieHallo Welt\n" +
+ "4,Hallo Welt wieHallo Welt wie\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testProjectCrossOnATupleInput1() throws Exception{
+ /*
+ * project cross on a tuple input 1
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple6<String, Long, String, Integer, Long, Long>> crossDs = ds.cross(ds2)
+ .projectFirst(2, 1)
+ .projectSecond(3)
+ .projectFirst(0)
+ .projectSecond(4, 1);
+
+ List<Tuple6<String, Long, String, Integer, Long, Long>> result = crossDs.collect();
+
+ String expected = "Hi,1,Hallo,1,1,1\n" +
+ "Hi,1,Hallo Welt,1,2,2\n" +
+ "Hi,1,Hallo Welt wie,1,1,3\n" +
+ "Hello,2,Hallo,2,1,1\n" +
+ "Hello,2,Hallo Welt,2,2,2\n" +
+ "Hello,2,Hallo Welt wie,2,1,3\n" +
+ "Hello world,2,Hallo,3,1,1\n" +
+ "Hello world,2,Hallo Welt,3,2,2\n" +
+ "Hello world,2,Hallo Welt wie,3,1,3\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testProjectCrossOnATupleInput2() throws Exception {
+ /*
+ * project cross on a tuple input 2
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple6<String, String, Long, Long, Long, Integer>> crossDs = ds.cross(ds2)
+ .projectSecond(3)
+ .projectFirst(2, 1)
+ .projectSecond(4, 1)
+ .projectFirst(0);
+
+ List<Tuple6<String, String, Long, Long, Long, Integer>> result = crossDs.collect();
+
+ String expected = "Hallo,Hi,1,1,1,1\n" +
+ "Hallo Welt,Hi,1,2,2,1\n" +
+ "Hallo Welt wie,Hi,1,1,3,1\n" +
+ "Hallo,Hello,2,1,1,2\n" +
+ "Hallo Welt,Hello,2,2,2,2\n" +
+ "Hallo Welt wie,Hello,2,1,3,2\n" +
+ "Hallo,Hello world,2,1,1,3\n" +
+ "Hallo Welt,Hello world,2,2,2,3\n" +
+ "Hallo Welt wie,Hello world,2,1,3,3\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testCorrectnessOfDefaultCross() throws Exception {
+ /*
+ * check correctness of default cross
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple2<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>>> crossDs = ds.cross(ds2);
+
+ List<Tuple2<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>>> result = crossDs.collect();
+
+ String expected = "(1,1,Hi),(2,2,1,Hallo Welt,2)\n"
+ +
+ "(1,1,Hi),(1,1,0,Hallo,1)\n" +
+ "(1,1,Hi),(2,3,2,Hallo Welt wie,1)\n" +
+ "(2,2,Hello),(2,2,1,Hallo Welt,2)\n" +
+ "(2,2,Hello),(1,1,0,Hallo,1)\n" +
+ "(2,2,Hello),(2,3,2,Hallo Welt wie,1)\n" +
+ "(3,2,Hello world),(2,2,1,Hallo Welt,2)\n" +
+ "(3,2,Hello world),(1,1,0,Hallo,1)\n" +
+ "(3,2,Hello world),(2,3,2,Hallo Welt wie,1)\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testCorrectnessOfCrossOnTwoCustomTypeInputs() throws Exception {
+ /*
+ * check correctness of cross on two custom type inputs
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<CustomType> ds = CollectionDataSets.getSmallCustomTypeDataSet(env);
+ DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
+ DataSet<CustomType> crossDs = ds.cross(ds2).with(new CustomTypeCross());
+
+ List<CustomType> result = crossDs.collect();
+
+ String expected = "1,0,HiHi\n"
+ + "2,1,HiHello\n"
+ + "2,2,HiHello world\n"
+ + "2,1,HelloHi\n"
+ + "4,2,HelloHello\n"
+ + "4,3,HelloHello world\n"
+ + "2,2,Hello worldHi\n"
+ + "4,3,Hello worldHello\n"
+ + "4,4,Hello worldHello world";
+
+ compareResultAsText(result, expected);
+ }
+
+ @Test
+ public void testCorrectnessOfCrossATupleInputAndACustomTypeInput() throws Exception {
+ /*
+ * check correctness of cross a tuple input and a custom type input
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> crossDs = ds.cross(ds2).with(new MixedCross());
+
+ List<Tuple3<Integer, Long, String>> result = crossDs.collect();
+
+ String expected = "2,0,HalloHi\n" +
+ "3,0,HalloHello\n" +
+ "3,0,HalloHello world\n" +
+ "3,0,Hallo WeltHi\n" +
+ "4,1,Hallo WeltHello\n" +
+ "4,2,Hallo WeltHello world\n" +
+ "3,0,Hallo Welt wieHi\n" +
+ "4,2,Hallo Welt wieHello\n" +
+ "4,4,Hallo Welt wieHello world\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ private static class Tuple5Cross implements CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, String>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, String> cross(
+ Tuple5<Integer, Long, Integer, String, Long> first,
+ Tuple5<Integer, Long, Integer, String, Long> second)
+ throws Exception {
+
+ return new Tuple2<Integer, String>(first.f2 + second.f2, first.f3 + second.f3);
+ }
+
+ }
+
+ private static class CustomTypeCross implements CrossFunction<CustomType, CustomType, CustomType> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public CustomType cross(CustomType first, CustomType second)
+ throws Exception {
+
+ return new CustomType(first.myInt * second.myInt, first.myLong + second.myLong, first.myString + second.myString);
+ }
+
+ }
+
+ private static class MixedCross implements CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple3<Integer, Long, String> cross(
+ Tuple5<Integer, Long, Integer, String, Long> first,
+ CustomType second) throws Exception {
+
+ return new Tuple3<Integer, Long, String>(first.f0 + second.myInt, first.f2 * second.myLong, first.f3 + second.myString);
+ }
+
+ }
+
+ private static class Tuple3ReturnLeft implements CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple3<Integer, Long, String> cross(
+ Tuple3<Integer, Long, String> first,
+ Tuple5<Integer, Long, Integer, String, Long> second) throws Exception {
+
+ return first;
+ }
+ }
+
+ private static class Tuple5ReturnRight implements CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple5<Integer, Long, Integer, String, Long> cross(
+ Tuple3<Integer, Long, String> first,
+ Tuple5<Integer, Long, Integer, String, Long> second)
+ throws Exception {
+
+ return second;
+ }
+
+ }
+
+ private static class Tuple5CrossBC extends RichCrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private int broadcast = 42;
+
+ @Override
+ public void open(Configuration config) {
+
+ Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
+ int sum = 0;
+ for (Integer i : ints) {
+ sum += i;
+ }
+ broadcast = sum;
+
+ }
+
+ @Override
+ public Tuple3<Integer, Integer, Integer> cross(
+ Tuple5<Integer, Long, Integer, String, Long> first,
+ Tuple5<Integer, Long, Integer, String, Long> second)
+ throws Exception {
+
+ return new Tuple3<Integer, Integer, Integer>(first.f0 + second.f0, first.f2 * second.f2, broadcast);
+ }
+ }
+}