You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/11/24 18:17:48 UTC
[05/16] flink git commit: Remove ITCases for Record API operators
Remove ITCases for Record API operators
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50ced5e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50ced5e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50ced5e3
Branch: refs/heads/master
Commit: 50ced5e3dca0983352ac9eb2ae5ba1a7de66f25b
Parents: 3333f84
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Oct 22 21:13:59 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Nov 24 18:16:51 2015 +0100
----------------------------------------------------------------------
.../broadcastvars/BroadcastBranchingITCase.java | 2 +-
.../test/failingPrograms/TaskFailureITCase.java | 4 +-
.../test/operators/CoGroupGroupSortITCase.java | 122 ----------
.../flink/test/operators/CoGroupITCase.java | 202 ----------------
.../flink/test/operators/CrossITCase.java | 197 ----------------
.../apache/flink/test/operators/JoinITCase.java | 182 --------------
.../apache/flink/test/operators/MapITCase.java | 133 -----------
.../test/operators/MapPartitionITCase.java | 100 --------
.../flink/test/operators/ObjectReuseITCase.java | 236 -------------------
.../flink/test/operators/ReduceITCase.java | 171 --------------
.../flink/test/operators/UnionITCase.java | 182 --------------
.../flink/test/operators/UnionSinkITCase.java | 175 --------------
.../operators/io/ContractITCaseIOFormats.java | 85 -------
.../test/util/ContractITCaseIOFormats.java | 85 +++++++
14 files changed, 88 insertions(+), 1788 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
index 4868aff..69be0ed 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.JoinOperator;
import org.apache.flink.api.java.record.operators.MapOperator;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
+import org.apache.flink.test.util.ContractITCaseIOFormats.ContractITCaseOutputFormat;
import org.apache.flink.test.util.RecordAPITestBase;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
index fe98e18..efb1c32 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
@@ -29,8 +29,8 @@ import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
+import org.apache.flink.test.util.ContractITCaseIOFormats.ContractITCaseInputFormat;
+import org.apache.flink.test.util.ContractITCaseIOFormats.ContractITCaseOutputFormat;
import org.apache.flink.test.util.FailingTestBase;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java
deleted file mode 100644
index 5b78a51..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.operators;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-@SuppressWarnings({"serial", "unchecked"})
-public class CoGroupGroupSortITCase extends JavaProgramTestBase {
-
- @Override
- protected void testProgram() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-
- DataSet<Tuple2<Long, Long>> input1 = env.fromElements(
- new Tuple2<Long, Long>(0L, 5L),
- new Tuple2<Long, Long>(0L, 4L),
- new Tuple2<Long, Long>(0L, 3L),
- new Tuple2<Long, Long>(0L, 2L),
- new Tuple2<Long, Long>(0L, 1L),
- new Tuple2<Long, Long>(1L, 10L),
- new Tuple2<Long, Long>(1L, 8L),
- new Tuple2<Long, Long>(1L, 9L),
- new Tuple2<Long, Long>(1L, 7L));
-
- DataSet<TestPojo> input2 = env.fromElements(
- new TestPojo(0L, 10L, 3L),
- new TestPojo(0L, 8L, 3L),
- new TestPojo(0L, 10L, 1L),
- new TestPojo(0L, 9L, 0L),
- new TestPojo(0L, 8L, 2L),
- new TestPojo(0L, 8L, 4L),
- new TestPojo(1L, 10L, 3L),
- new TestPojo(1L, 8L, 3L),
- new TestPojo(1L, 10L, 1L),
- new TestPojo(1L, 9L, 0L),
- new TestPojo(1L, 8L, 2L),
- new TestPojo(1L, 8L, 4L));
-
- input1.coGroup(input2)
- .where(1).equalTo("b")
- .sortFirstGroup(0, Order.DESCENDING)
- .sortSecondGroup("c", Order.ASCENDING).sortSecondGroup("a", Order.DESCENDING)
-
- .with(new ValidatingCoGroup())
- .output(new DiscardingOutputFormat<NullValue>());
-
- env.execute();
- }
-
-
- private static class ValidatingCoGroup implements CoGroupFunction<Tuple2<Long, Long>, TestPojo, NullValue> {
-
- @Override
- public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<TestPojo> second, Collector<NullValue> out) throws Exception {
- // validate the tuple input, field 1, descending
- {
- long lastValue = Long.MAX_VALUE;
-
- for (Tuple2<Long, Long> t : first) {
- long current = t.f1;
- Assert.assertTrue(current <= lastValue);
- lastValue = current;
- }
- }
-
-
- // validate the pojo input
- {
- TestPojo lastValue = new TestPojo(Long.MAX_VALUE, 0, Long.MIN_VALUE);
-
- for (TestPojo current : second) {
- Assert.assertTrue(current.c >= lastValue.c);
- Assert.assertTrue(current.c != lastValue.c || current.a <= lastValue.a);
-
- lastValue = current;
- }
- }
-
- }
- }
-
- public static class TestPojo implements Cloneable {
- public long a;
- public long b;
- public long c;
-
-
- public TestPojo() {}
-
- public TestPojo(long a, long b, long c) {
- this.a = a;
- this.b = b;
- this.c = c;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
deleted file mode 100644
index be05186..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.operators;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-@SuppressWarnings("deprecation")
-@RunWith(Parameterized.class)
-public class CoGroupITCase extends RecordAPITestBase {
-
- String leftInPath = null;
- String rightInPath = null;
- String resultPath = null;
-
- public CoGroupITCase(Configuration testConfig) {
- super(testConfig);
- }
-
- private static final String LEFT_IN = "1 1\n2 2\n3 3\n4 4\n1 2\n2 3\n3 4\n4 5\n" +
- "1 3\n2 4\n3 5\n4 6\n1 4\n2 5\n3 6\n4 7\n";
-
- private static final String RIGHT_IN = "1 1\n2 2\n3 3\n5 1\n1 1\n2 2\n3 3\n6 1\n" +
- "1 1\n2 2\n2 2\n7 1\n1 1\n2 2\n2 2\n8 1\n";
-
- private static final String RESULT = "1 6\n2 2\n3 12\n4 22\n5 -1\n6 -1\n7 -1\n8 -1\n";
-
- @Override
- protected void preSubmit() throws Exception {
- leftInPath = createTempFile("left_in.txt", LEFT_IN);
- rightInPath = createTempFile("right_in.txt", RIGHT_IN);
- resultPath = getTempDirPath("result");
- }
-
- public static class CoGroupTestInFormat extends DelimitedInputFormat {
- private static final long serialVersionUID = 1L;
-
- private final StringValue keyString = new StringValue();
- private final StringValue valueString = new StringValue();
-
- @Override
- public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
- this.keyString.setValueAscii(bytes, offset, 1);
- this.valueString.setValueAscii(bytes, offset + 2, 1);
- target.setField(0, keyString);
- target.setField(1, valueString);
-
- return target;
- }
-
- }
-
- public static class CoGroupOutFormat extends FileOutputFormat {
- private static final long serialVersionUID = 1L;
-
- private final StringBuilder buffer = new StringBuilder();
- private final StringValue keyString = new StringValue();
- private final IntValue valueInteger = new IntValue();
-
- @Override
- public void writeRecord(Record record) throws IOException {
- this.buffer.setLength(0);
- this.buffer.append(record.getField(0, keyString).toString());
- this.buffer.append(' ');
- this.buffer.append(record.getField(1, valueInteger).getValue());
- this.buffer.append('\n');
-
- byte[] bytes = this.buffer.toString().getBytes();
- this.stream.write(bytes);
- }
- }
-
- public static class TestCoGrouper extends CoGroupFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private StringValue keyString = new StringValue();
- private StringValue valueString = new StringValue();
-
- @Override
- public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) {
-
- Record record = null;
- int sum = 0;
-
- while (records1.hasNext()) {
- record = records1.next();
- keyString = record.getField(0, keyString);
- valueString = record.getField(1, valueString);
- sum += Integer.parseInt(valueString.getValue());
- }
-
-
- while (records2.hasNext()) {
- record = records2.next();
- keyString = record.getField(0, keyString);
- valueString = record.getField(1, valueString);
- sum -= Integer.parseInt(valueString.getValue());
- }
- record.setField(1, new IntValue(sum));
-
- out.collect(record);
- }
-
- }
-
- @Override
- protected Plan getTestJob() {
- FileDataSource input_left = new FileDataSource(new CoGroupTestInFormat(), leftInPath);
- DelimitedInputFormat.configureDelimitedFormat(input_left)
- .recordDelimiter('\n');
- input_left.setParallelism(config.getInteger("CoGroupTest#NoSubtasks", 1));
-
- FileDataSource input_right = new FileDataSource(new CoGroupTestInFormat(), rightInPath);
- DelimitedInputFormat.configureDelimitedFormat(input_right)
- .recordDelimiter('\n');
- input_right.setParallelism(config.getInteger("CoGroupTest#NoSubtasks", 1));
-
- CoGroupOperator testCoGrouper = CoGroupOperator.builder(new TestCoGrouper(), StringValue.class, 0, 0)
- .build();
- testCoGrouper.setParallelism(config.getInteger("CoGroupTest#NoSubtasks", 1));
- testCoGrouper.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY,
- config.getString("CoGroupTest#LocalStrategy", ""));
- testCoGrouper.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY,
- config.getString("CoGroupTest#ShipStrategy", ""));
-
- FileDataSink output = new FileDataSink(new CoGroupOutFormat(), resultPath);
- output.setParallelism(1);
-
- output.setInput(testCoGrouper);
- testCoGrouper.setFirstInput(input_left);
- testCoGrouper.setSecondInput(input_right);
-
- return new Plan(output);
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(RESULT, resultPath);
- }
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
- String[] localStrategies = { Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE };
-
- String[] shipStrategies = { Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH };
-
- for (String localStrategy : localStrategies) {
- for (String shipStrategy : shipStrategies) {
-
- Configuration config = new Configuration();
- config.setString("CoGroupTest#LocalStrategy", localStrategy);
- config.setString("CoGroupTest#ShipStrategy", shipStrategy);
- config.setInteger("CoGroupTest#NoSubtasks", 4);
-
- tConfigs.add(config);
- }
- }
-
- return toParameterList(tConfigs);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
deleted file mode 100644
index 3fde5a9..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.operators;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.CrossFunction;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.api.java.record.operators.CrossOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.LinkedList;
-
-@SuppressWarnings("deprecation")
-@RunWith(Parameterized.class)
-public class CrossITCase extends RecordAPITestBase {
-
- private String leftInPath = null;
- private String rightInPath = null;
- private String resultPath = null;
-
- public CrossITCase(Configuration testConfig) {
- super(testConfig);
- }
-
- //private static final String LEFT_IN = "1 1\n2 2\n1 1\n2 2\n3 3\n4 4\n3 3\n4 4\n";
-
- //private static final String RIGHT_IN = "1 1\n1 2\n2 2\n2 4\n3 3\n3 6\n4 4\n4 8\n";
-
- //private static final String RESULT = "4 1\n4 1\n4 2\n4 2\n5 2\n5 2\n5 4\n5 4\n6 3\n6 3\n7 4\n7 4\n"
- // + "5 0\n5 0\n5 1\n5 1\n6 1\n6 1\n6 3\n6 3\n7 2\n7 2\n8 3\n8 3\n"
- // + "6 -1\n6 -1\n6 0\n6 0\n7 0\n7 0\n8 1\n8 1\n" + "7 -2\n7 -2\n7 -1\n7 -1\n8 -1\n8 -1\n";
-
- //private static final String RESULT = "10 1\n10 1\n10 5\n10 5\n4 1\n4 1\n4 2\n4 2\n5 0\n5 0\n5 1\n," +
- // "5 1\n5 2\n5 2\n5 4\n5 4\n6 -1\n6 -1\n6 0\n6 0\n6 1\n6 1\n6 3\n6 3\n6 3\n6 3\n6 6\n6 6\n7 -1\n" +
- // "7 -1\n7 -2\n7 -2\n7 0\n7 0\n7 2\n7 2\n7 2\n7 2\n7 4\n7 4\n7 5\n7 5\n7 8\n7 8\n8 -1\n8 -1\n8 1\n" +
- // "8 1\n8 1\n8 1\n8 3\n8 3\n8 4\n8 4\n8 7\n8 7\n9 0\n9 0\n9 2\n9 2\n9 3\n9 3\n9 6\n9 6\n";
-
- //private static final String RESULT = "2 2\n4 4\n1 1\n3 3\n2 2\n4 4\n1 1\n3 3\n5 0\n5 1\n6 1\n 6 3\n" +
- // "7 2\n7 5\n8 3\n8 7\n7 -2\n7 -1\n8 -1\n8 1\n9 0\n9 3\n10 1\n10 5\n4 1\n4 2\n5 2\n5 4\n6 3\n" +
- // "6 6\n7 4\n7 8\n6 -1\n6 0\n7 0\n7 2\n8 1\n8 4\n9 2\n9 6\n5 0\n5 1\n6 1\n6 3\n7 2\n7 5\n 8 3\n" +
- // "8 7\n7 -2\n7 -1\n8 -1\n8 1\n9 0\n9 3\n10 1\n10 5\n4 1\n4 2\n5 2\n5 4\n6 3\n6 6\n7 4\n7 8\n" +
- // "6 -1\n6 0\n7 0\n7 2\n8 1\n8 4\n9 2\n9 6";
-
-
- private static final String LEFT_IN = "1 1\n2 2\n3 3\n";
- private static final String RIGHT_IN = "3 6\n4 4\n4 8\n";
-
- private static final String RESULT = "6 6\n7 5\n7 8\n7 4\n8 3\n8 7\n8 4\n9 2\n9 6\n";
-
- @Override
- protected void preSubmit() throws Exception {
- leftInPath = createTempFile("left_in.txt", LEFT_IN);
- rightInPath = createTempFile("right_in.txt", RIGHT_IN);
- resultPath = getTempDirPath("result");
- }
-
-
- public static class TestCross extends CrossFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private StringValue string = new StringValue();
- private IntValue integer = new IntValue();
-
- @Override
- public Record cross(Record record1, Record record2) throws Exception {
- string = record1.getField(1, string);
- int val1 = Integer.parseInt(string.toString());
- string = record2.getField(1, string);
- int val2 = Integer.parseInt(string.toString());
- string = record1.getField(0, string);
- int key1 = Integer.parseInt(string.toString());
- string = record2.getField(0, string);
- int key2 = Integer.parseInt(string.toString());
-
- string.setValue((key1 + key2 + 2) + "");
- integer.setValue(val2 - val1 + 1);
-
- record1.setField(0, string);
- record1.setField(1, integer);
-
- return record1;
- }
-
- }
-
- @Override
- protected Plan getTestJob() {
-
- FileDataSource input_left = new FileDataSource(
- new ContractITCaseInputFormat(), leftInPath);
- DelimitedInputFormat.configureDelimitedFormat(input_left)
- .recordDelimiter('\n');
- input_left.setParallelism(config.getInteger("CrossTest#NoSubtasks", 1));
-
- FileDataSource input_right = new FileDataSource(
- new ContractITCaseInputFormat(), rightInPath);
- DelimitedInputFormat.configureDelimitedFormat(input_right)
- .recordDelimiter('\n');
- input_right.setParallelism(config.getInteger("CrossTest#NoSubtasks", 1));
-
- CrossOperator testCross = CrossOperator.builder(new TestCross()).build();
- testCross.setParallelism(config.getInteger("CrossTest#NoSubtasks", 1));
- testCross.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY,
- config.getString("CrossTest#LocalStrategy", ""));
- if (config.getString("CrossTest#ShipStrategy", "").equals("BROADCAST_FIRST")) {
- testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT,
- Optimizer.HINT_SHIP_STRATEGY_BROADCAST);
- testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT,
- Optimizer.HINT_SHIP_STRATEGY_FORWARD);
- } else if (config.getString("CrossTest#ShipStrategy", "").equals("BROADCAST_SECOND")) {
- testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT,
- Optimizer.HINT_SHIP_STRATEGY_BROADCAST);
- testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT,
- Optimizer.HINT_SHIP_STRATEGY_FORWARD);
- } else {
- testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY,
- config.getString("CrossTest#ShipStrategy", ""));
- }
-
- FileDataSink output = new FileDataSink(
- new ContractITCaseOutputFormat(), resultPath);
- output.setParallelism(1);
-
- output.setInput(testCross);
- testCross.setFirstInput(input_left);
- testCross.setSecondInput(input_right);
-
- return new Plan(output);
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(RESULT, resultPath);
- }
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
- String[] localStrategies = { Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST,
- Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND,
- Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST,
- Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND };
-
- String[] shipStrategies = { "BROADCAST_FIRST", "BROADCAST_SECOND"
- // PactCompiler.HINT_SHIP_STRATEGY_BROADCAST
- // PactCompiler.HINT_SHIP_STRATEGY_SFR
- };
-
- for (String localStrategy : localStrategies) {
- for (String shipStrategy : shipStrategies) {
-
- Configuration config = new Configuration();
- config.setString("CrossTest#LocalStrategy", localStrategy);
- config.setString("CrossTest#ShipStrategy", shipStrategy);
- config.setInteger("CrossTest#NoSubtasks", 4);
-
- tConfigs.add(config);
- }
- }
-
- return toParameterList(tConfigs);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/operators/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/JoinITCase.java
deleted file mode 100644
index c2ec55a..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/JoinITCase.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.operators;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.LinkedList;
-
-@SuppressWarnings("deprecation")
-@RunWith(Parameterized.class)
-public class JoinITCase extends RecordAPITestBase {
-
- private static final Logger LOG = LoggerFactory.getLogger(JoinITCase.class);
-
- String leftInPath = null;
- String rightInPath = null;
- String resultPath = null;
-
- public JoinITCase(Configuration testConfig) {
- super(testConfig);
- }
-
- private static final String LEFT_IN = "1 1\n2 2\n3 3\n4 4\n1 2\n2 3\n3 4\n4 5\n" +
- "1 3\n2 4\n3 5\n4 6\n1 4\n2 5\n3 6\n4 7\n";
-
- private static final String RIGHT_IN = "1 1\n2 2\n3 3\n5 1\n1 1\n2 2\n3 3\n6 1\n" +
- "1 1\n2 2\n2 2\n7 1\n1 1\n2 2\n2 2\n8 1\n";
-
- private static final String RESULT = "2 1\n2 1\n2 1\n2 1\n2 2\n2 2\n2 2\n2 2\n2 3\n2 3\n2 3\n2 3\n2 4\n2 4\n2 4\n2 4\n"
- + "4 1\n4 1\n4 2\n4 2\n4 3\n4 3\n4 4\n4 4\n"
- + "3 1\n3 2\n3 3\n3 4\n3 1\n3 2\n3 3\n3 4\n3 1\n3 2\n3 3\n3 4\n3 1\n3 2\n3 3\n3 4\n3 1\n3 2\n3 3\n3 4\n3 1\n3 2\n3 3\n3 4\n";
-
- @Override
- protected void preSubmit() throws Exception {
- leftInPath = createTempFile("left_in.txt", LEFT_IN);
- rightInPath = createTempFile("right_in.txt", RIGHT_IN);
- resultPath = getTempDirPath("result");
- }
-
- public static class TestMatcher extends JoinFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private StringValue keyString = new StringValue();
- private StringValue valueString = new StringValue();
-
- @Override
- public void join(Record value1, Record value2, Collector<Record> out)
- throws Exception {
- keyString = value1.getField(0, keyString);
- keyString.setValue(""+ (Integer.parseInt(keyString.getValue())+1));
- value1.setField(0, keyString);
- valueString = value1.getField(1, valueString);
- int val1 = Integer.parseInt(valueString.getValue())+2;
- valueString = value2.getField(1, valueString);
- int val2 = Integer.parseInt(valueString.getValue())+1;
-
- value1.setField(1, new IntValue(val1 - val2));
-
- out.collect(value1);
-
- if (LOG.isDebugEnabled())
- LOG.debug("Processed: [" + keyString.toString() + "," + val1 + "] + " +
- "[" + keyString.toString() + "," + val2 + "]");
- }
-
- }
-
- @Override
- protected Plan getTestJob() {
- FileDataSource input_left = new FileDataSource(
- new ContractITCaseInputFormat(), leftInPath);
- DelimitedInputFormat.configureDelimitedFormat(input_left)
- .recordDelimiter('\n');
- input_left.setParallelism(config.getInteger("MatchTest#NoSubtasks", 1));
-
- FileDataSource input_right = new FileDataSource(
- new ContractITCaseInputFormat(), rightInPath);
- DelimitedInputFormat.configureDelimitedFormat(input_right)
- .recordDelimiter('\n');
- input_right.setParallelism(config.getInteger("MatchTest#NoSubtasks", 1));
-
- JoinOperator testMatcher = JoinOperator.builder(new TestMatcher(), StringValue.class, 0, 0)
- .build();
- testMatcher.setParallelism(config.getInteger("MatchTest#NoSubtasks", 1));
- testMatcher.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY,
- config.getString("MatchTest#LocalStrategy", ""));
- if (config.getString("MatchTest#ShipStrategy", "").equals("BROADCAST_FIRST")) {
- testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT,
- Optimizer.HINT_SHIP_STRATEGY_BROADCAST);
- testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT,
- Optimizer.HINT_SHIP_STRATEGY_FORWARD);
- } else if (config.getString("MatchTest#ShipStrategy", "").equals("BROADCAST_SECOND")) {
- testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT,
- Optimizer.HINT_SHIP_STRATEGY_FORWARD);
- testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT,
- Optimizer.HINT_SHIP_STRATEGY_BROADCAST);
- } else {
- testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY,
- config.getString("MatchTest#ShipStrategy", ""));
- }
-
- FileDataSink output = new FileDataSink(
- new ContractITCaseOutputFormat(), resultPath);
- output.setParallelism(1);
-
- output.setInput(testMatcher);
- testMatcher.setFirstInput(input_left);
- testMatcher.setSecondInput(input_right);
-
- return new Plan(output);
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(RESULT, resultPath);
- }
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
- String[] localStrategies = { Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE,
- Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST, Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND };
-
- String[] shipStrategies = { Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH, "BROADCAST_FIRST", "BROADCAST_SECOND"};
-
- for (String localStrategy : localStrategies) {
- for (String shipStrategy : shipStrategies) {
-
- Configuration config = new Configuration();
- config.setString("MatchTest#LocalStrategy", localStrategy);
- config.setString("MatchTest#ShipStrategy", shipStrategy);
- config.setInteger("MatchTest#NoSubtasks", 4);
-
- tConfigs.add(config);
- }
- }
-
- return toParameterList(tConfigs);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java
deleted file mode 100644
index 28b9501..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.operators;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.LinkedList;
-
-@SuppressWarnings("deprecation")
-@RunWith(Parameterized.class)
-public class MapITCase extends RecordAPITestBase {
-
- private static final Logger LOG = LoggerFactory.getLogger(MapITCase.class);
-
- String inPath = null;
- String resultPath = null;
-
- public MapITCase(Configuration testConfig) {
- super(testConfig);
- }
-
- private static final String IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n" +
- "1 1\n2 2\n2 2\n4 4\n4 4\n6 3\n5 9\n8 8\n1 1\n2 2\n2 2\n3 0\n4 4\n" +
- "5 9\n7 7\n8 8\n1 1\n9 1\n5 9\n4 4\n4 4\n6 6\n7 7\n8 8\n";
-
- private static final String RESULT = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
-
- @Override
- protected void preSubmit() throws Exception {
- inPath = createTempFile("in.txt", IN);
- resultPath = getTempDirPath("result");
- }
-
- public static class TestMapper extends MapFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private StringValue keyString = new StringValue();
- private StringValue valueString = new StringValue();
-
- @Override
- public void map(Record record, Collector<Record> out) throws Exception {
- keyString = record.getField(0, keyString);
- valueString = record.getField(1, valueString);
-
- LOG.debug("Processed: [" + keyString.toString() + "," + valueString.getValue() + "]");
-
- if (Integer.parseInt(keyString.toString()) + Integer.parseInt(valueString.toString()) < 10) {
-
- record.setField(0, valueString);
- record.setField(1, new IntValue(Integer.parseInt(keyString.toString()) + 10));
-
- out.collect(record);
- }
-
- }
- }
-
- @Override
- protected Plan getTestJob() {
- FileDataSource input = new FileDataSource(
- new ContractITCaseInputFormat(), inPath);
- DelimitedInputFormat.configureDelimitedFormat(input)
- .recordDelimiter('\n');
- input.setParallelism(config.getInteger("MapTest#NoSubtasks", 1));
-
- MapOperator testMapper = MapOperator.builder(new TestMapper()).build();
- testMapper.setParallelism(config.getInteger("MapTest#NoSubtasks", 1));
-
- FileDataSink output = new FileDataSink(
- new ContractITCaseOutputFormat(), resultPath);
- output.setParallelism(1);
-
- output.setInput(testMapper);
- testMapper.setInput(input);
-
- return new Plan(output);
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(RESULT, resultPath);
- }
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
- LinkedList<Configuration> testConfigs = new LinkedList<Configuration>();
-
- Configuration config = new Configuration();
- config.setInteger("MapTest#NoSubtasks", 4);
- testConfigs.add(config);
-
- return toParameterList(testConfigs);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java
deleted file mode 100644
index 3cb5580..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.operators;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapPartitionFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("serial")
-public class MapPartitionITCase extends JavaProgramTestBase {
-
- private static final String IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n"
- + "1 1\n2 2\n2 2\n4 4\n4 4\n6 3\n5 9\n8 8\n1 1\n2 2\n2 2\n3 0\n4 4\n"
- + "5 9\n7 7\n8 8\n1 1\n9 1\n5 9\n4 4\n4 4\n6 6\n7 7\n8 8\n";
-
- private static final String RESULT = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
-
-
- private List<Tuple2<String, String>> input = new ArrayList<Tuple2<String,String>>();
-
- private List<Tuple2<String, Integer>> expected = new ArrayList<Tuple2<String,Integer>>();
-
- private List<Tuple2<String, Integer>> result = new ArrayList<Tuple2<String,Integer>>();
-
-
- @Override
- protected void preSubmit() throws Exception {
-
- // create input
- for (String s :IN.split("\n")) {
- String[] fields = s.split(" ");
- input.add(new Tuple2<String, String>(fields[0], fields[1]));
- }
-
- // create expected
- for (String s : RESULT.split("\n")) {
- String[] fields = s.split(" ");
- expected.add(new Tuple2<String, Integer>(fields[0], Integer.parseInt(fields[1])));
- }
-
- }
-
- @Override
- protected void postSubmit() {
- compareResultCollections(expected, result, new TupleComparator<Tuple2<String, Integer>>());
- }
-
- @Override
- protected void testProgram() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple2<String, String>> data = env.fromCollection(input);
-
- data.mapPartition(new TestMapPartition()).output(new LocalCollectionOutputFormat<Tuple2<String,Integer>>(result));
-
- env.execute();
- }
-
-
- public static class TestMapPartition implements MapPartitionFunction<Tuple2<String, String>, Tuple2<String, Integer>> {
-
- @Override
- public void mapPartition(Iterable<Tuple2<String, String>> values, Collector<Tuple2<String, Integer>> out) {
- for (Tuple2<String, String> value : values) {
- String keyString = value.f0;
- String valueString = value.f1;
-
- int keyInt = Integer.parseInt(keyString);
- int valueInt = Integer.parseInt(valueString);
-
- if (keyInt + valueInt < 10) {
- out.collect(new Tuple2<String, Integer>(valueString, keyInt + 10));
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java
deleted file mode 100644
index faf6de5..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.operators;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * These check whether the object-reuse execution mode does really reuse objects.
- */
-@RunWith(Parameterized.class)
-public class ObjectReuseITCase extends JavaProgramTestBase {
-
- private static int NUM_PROGRAMS = 4;
-
- private int curProgId = config.getInteger("ProgramId", -1);
- private String resultPath;
- private String expectedResult;
-
- private static String inReducePath;
- private static String inGroupReducePath;
-
- private String IN_REDUCE = "a,1\na,2\na,3\na,4\na,50\n";
- private String IN_GROUP_REDUCE = "a,1\na,2\na,3\na,4\na,5\n";
-
- public ObjectReuseITCase(Configuration config) {
- super(config);
- }
-
- @Override
- protected void preSubmit() throws Exception {
- inReducePath = createTempFile("in_reduce.txt", IN_REDUCE);
- inGroupReducePath = createTempFile("in_group_reduce.txt", IN_GROUP_REDUCE);
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void testProgram() throws Exception {
- expectedResult = Progs.runProgram(curProgId, resultPath);
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
- @Override
- protected boolean skipCollectionExecution() {
- return true;
- }
-
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
- for(int i=1; i <= NUM_PROGRAMS; i++) {
- Configuration config = new Configuration();
- config.setInteger("ProgramId", i);
- tConfigs.add(config);
- }
-
- return toParameterList(tConfigs);
- }
-
- @SuppressWarnings({"unchecked", "serial"})
- private static class Progs {
-
- public static String runProgram(int progId, String resultPath) throws Exception {
-
- switch(progId) {
-
- case 1: {
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().enableObjectReuse();
-
- DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inReducePath).types(String.class, Integer.class).setParallelism(1);
- DataSet<Tuple2<String, Integer>> result = input.groupBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
-
- @Override
- public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws
- Exception {
- value2.f1 += value1.f1;
- return value2;
- }
-
- });
-
- result.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "a,100\n";
-
- }
-
- case 2: {
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().enableObjectReuse();
-
- DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inReducePath).types(String.class, Integer.class).setParallelism(1);
-
- DataSet<Tuple2<String, Integer>> result = input
- .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
-
- @Override
- public Tuple2<String, Integer> reduce(
- Tuple2<String, Integer> value1,
- Tuple2<String, Integer> value2) throws Exception {
- value2.f1 += value1.f1;
- return value2;
- }
-
- });
-
- result.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "a,100\n";
-
- }
-
- case 3: {
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().enableObjectReuse();
-
- DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inGroupReducePath).types(String.class, Integer.class).setParallelism(1);
-
- DataSet<Tuple2<String, Integer>> result = input.reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
-
- @Override
- public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception {
- List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
- for (Tuple2<String, Integer> val : values) {
- list.add(val);
- }
-
- for (Tuple2<String, Integer> val : list) {
- out.collect(val);
- }
- }
-
- });
-
- result.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "a,4\n" +
- "a,4\n" +
- "a,5\n" +
- "a,5\n" +
- "a,5\n";
-
- }
-
- case 4: {
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().enableObjectReuse();
-
- DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inGroupReducePath).types(String.class, Integer.class).setParallelism(1);
-
- DataSet<Tuple2<String, Integer>> result = input.reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
-
- @Override
- public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception {
- List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
- for (Tuple2<String, Integer> val : values) {
- list.add(val);
- }
-
- for (Tuple2<String, Integer> val : list) {
- out.collect(val);
- }
- }
-
- });
-
- result.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "a,4\n" +
- "a,4\n" +
- "a,5\n" +
- "a,5\n" +
- "a,5\n";
-
- }
-
- default:
- throw new IllegalArgumentException("Invalid program id");
- }
-
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
deleted file mode 100644
index f5511c8..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-@SuppressWarnings("deprecation")
-@RunWith(Parameterized.class)
-public class ReduceITCase extends RecordAPITestBase {
-
- String inPath = null;
- String resultPath = null;
-
- public ReduceITCase(Configuration testConfig) {
- super(testConfig);
- }
-
- private static final String IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n1 1\n" +
- "2 2\n2 2\n4 4\n4 4\n6 3\n5 9\n8 8\n1 1\n2 2\n2 2\n3 0\n4 4\n5 9\n7 7\n8 8\n" +
- "1 1\n9 1\n5 9\n4 4\n4 4\n6 6\n7 7\n8 8\n";
-
- private static final String RESULT = "1 4\n2 18\n3 0\n4 28\n5 27\n6 15\n7 21\n8 32\n9 1\n";
-
- @Override
- protected void preSubmit() throws Exception {
- inPath = createTempFile("in.txt", IN);
- resultPath = getTempDirPath("result");
- }
-
- @ReduceOperator.Combinable
- public static class TestReducer extends ReduceFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private StringValue reduceValue = new StringValue();
- private StringValue combineValue = new StringValue();
-
- @Override
- public void combine(Iterator<Record> records, Collector<Record> out) {
- Record record = null;
- int sum = 0;
-
- while (records.hasNext()) {
- record = records.next();
- combineValue = record.getField(1, combineValue);
- sum += Integer.parseInt(combineValue.toString());
- }
- combineValue.setValue(sum + "");
- record.setField(1, combineValue);
- out.collect(record);
- }
-
- @Override
- public void reduce(Iterator<Record> records, Collector<Record> out) {
- Record record = null;
- int sum = 0;
-
- while (records.hasNext()) {
- record = records.next();
- reduceValue = record.getField(1, reduceValue);
- sum += Integer.parseInt(reduceValue.toString());
- }
- record.setField(1, new IntValue(sum));
- out.collect(record);
- }
- }
-
- @Override
- protected JobGraph getJobGraph() throws Exception {
- FileDataSource input = new FileDataSource(
- new ContractITCaseInputFormat(), inPath);
- DelimitedInputFormat.configureDelimitedFormat(input)
- .recordDelimiter('\n');
- input.setParallelism(config.getInteger("ReduceTest#NoSubtasks", 1));
-
- ReduceOperator testReducer = ReduceOperator.builder(new TestReducer(), StringValue.class, 0)
- .build();
- testReducer.setParallelism(config.getInteger("ReduceTest#NoSubtasks", 1));
- testReducer.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY,
- config.getString("ReduceTest#LocalStrategy", ""));
- testReducer.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY,
- config.getString("ReduceTest#ShipStrategy", ""));
-
- FileDataSink output = new FileDataSink(
- new ContractITCaseOutputFormat(), resultPath);
- output.setParallelism(1);
-
- output.setInput(testReducer);
- testReducer.setInput(input);
-
- Plan plan = new Plan(output);
- plan.setExecutionConfig(new ExecutionConfig());
- Optimizer pc = new Optimizer(new DataStatistics(), this.config);
- OptimizedPlan op = pc.compile(plan);
-
- JobGraphGenerator jgg = new JobGraphGenerator();
- return jgg.compileJobGraph(op);
-
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(RESULT, resultPath);
- }
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
- String[] localStrategies = { Optimizer.HINT_LOCAL_STRATEGY_SORT };
- String[] shipStrategies = { Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH };
-
- for (String localStrategy : localStrategies) {
- for (String shipStrategy : shipStrategies) {
-
- Configuration config = new Configuration();
- config.setString("ReduceTest#LocalStrategy", localStrategy);
- config.setString("ReduceTest#ShipStrategy", shipStrategy);
- config.setInteger("ReduceTest#NoSubtasks", 4);
- tConfigs.add(config);
- }
- }
-
- return toParameterList(tConfigs);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/operators/UnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/UnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/UnionITCase.java
deleted file mode 100644
index b833421..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/UnionITCase.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.operators;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.LinkedList;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@SuppressWarnings("deprecation")
-@RunWith(Parameterized.class)
-public class UnionITCase extends RecordAPITestBase {
- private static final Logger LOG = LoggerFactory.getLogger(UnionITCase.class);
-
- String inPath = null;
- String emptyInPath = null;
- String resultPath = null;
-
- public UnionITCase(Configuration testConfig) {
- super(testConfig);
- }
-
- private static final String IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n1 1\n" +
- "2 2\n2 2\n4 4\n4 4\n6 3\n5 9\n8 8\n1 1\n2 2\n2 2\n3 0\n4 4\n5 9\n7 7\n8 8\n" +
- "1 1\n9 1\n5 9\n4 4\n4 4\n6 6\n7 7\n8 8\n";
-
- private static final String RESULT = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n" +
- "4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
-
- private static final String EMPTY_RESULT = "";
-
- private static final String MAP_RESULT_TWICE = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n" +
- "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
-
- @Override
- protected void preSubmit() throws Exception {
- inPath = createTempFile("in.txt", IN);
- emptyInPath = createTempFile("empty_in.txt", "");
- resultPath = getTempDirPath("result");
- }
-
- public static class TestMapper extends MapFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private StringValue keyString = new StringValue();
- private StringValue valueString = new StringValue();
-
- @Override
- public void map(Record record, Collector<Record> out) throws Exception {
- keyString = record.getField(0, keyString);
- valueString = record.getField(1, valueString);
-
- if (LOG.isDebugEnabled())
- LOG.debug("Processed: [" + keyString.toString() + "," + valueString.getValue() + "]");
-
- if (Integer.parseInt(keyString.toString()) + Integer.parseInt(valueString.toString()) < 10) {
-
- record.setField(0, valueString);
- record.setField(1, new IntValue(Integer.parseInt(keyString.toString()) + 10));
-
- out.collect(record);
- }
-
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- protected Plan getTestJob() {
- String input1Path = config.getString("UnionTest#Input1Path", "").equals("empty") ? emptyInPath : inPath;
- String input2Path = config.getString("UnionTest#Input2Path", "").equals("empty") ? emptyInPath : inPath;
-
- FileDataSource input1 = new FileDataSource(
- new ContractITCaseInputFormat(), input1Path);
- DelimitedInputFormat.configureDelimitedFormat(input1)
- .recordDelimiter('\n');
- input1.setParallelism(config.getInteger("UnionTest#NoSubtasks", 1));
-
- FileDataSource input2 = new FileDataSource(
- new ContractITCaseInputFormat(), input2Path);
- DelimitedInputFormat.configureDelimitedFormat(input2)
- .recordDelimiter('\n');
- input2.setParallelism(config.getInteger("UnionTest#NoSubtasks", 1));
-
- MapOperator testMapper = MapOperator.builder(new TestMapper()).build();
- testMapper.setParallelism(config.getInteger("UnionTest#NoSubtasks", 1));
-
- FileDataSink output = new FileDataSink(
- new ContractITCaseOutputFormat(), resultPath);
- output.setParallelism(1);
-
- output.setInput(testMapper);
-
- testMapper.addInput(input1);
- testMapper.addInput(input2);
-
- return new Plan(output);
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(config.getString("UnionTest#ExpectedResult", ""), resultPath);
-
- }
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws IOException {
- LinkedList<Configuration> testConfigs = new LinkedList<Configuration>();
-
- //second input empty
- Configuration config = new Configuration();
- config.setInteger("UnionTest#NoSubtasks", 4);
- config.setString("UnionTest#ExpectedResult", RESULT);
- config.setString("UnionTest#Input1Path", "non-empty");
- config.setString("UnionTest#Input2Path", "empty");
- testConfigs.add(config);
-
-
- //first input empty
- config = new Configuration();
- config.setInteger("UnionTest#NoSubtasks", 4);
- config.setString("UnionTest#ExpectedResult", RESULT);
- config.setString("UnionTest#Input1Path", "empty");
- config.setString("UnionTest#Input2Path", "non-empty");
- testConfigs.add(config);
-
- //both inputs full
- config = new Configuration();
- config.setInteger("UnionTest#NoSubtasks", 4);
- config.setString("UnionTest#ExpectedResult", MAP_RESULT_TWICE);
- config.setString("UnionTest#Input1Path", "non-empty");
- config.setString("UnionTest#Input2Path", "non-empty");
- testConfigs.add(config);
-
- //both inputs empty
- config = new Configuration();
- config.setInteger("UnionTest#NoSubtasks", 4);
- config.setString("UnionTest#ExpectedResult", EMPTY_RESULT);
- config.setString("UnionTest#Input1Path", "empty");
- config.setString("UnionTest#Input2Path", "empty");
- testConfigs.add(config);
-
- return toParameterList(testConfigs);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/operators/UnionSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/UnionSinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/UnionSinkITCase.java
deleted file mode 100644
index 35cb8af..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/UnionSinkITCase.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.operators;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-@SuppressWarnings("deprecation")
-@RunWith(Parameterized.class)
-public class UnionSinkITCase extends RecordAPITestBase {
-
- public UnionSinkITCase(Configuration testConfig) {
- super(testConfig);
- setTaskManagerNumSlots(parallelism);
- }
-
- private static final String MAP_IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n" +
- "1 1\n2 2\n2 2\n4 4\n4 4\n6 3\n5 9\n8 8\n" +
- "1 1\n2 2\n2 2\n3 0\n4 4\n5 9\n7 7\n8 8\n" +
- "1 1\n9 1\n5 9\n4 4\n4 4\n6 6\n7 7\n8 8\n";
-
- private static final String MAP_RESULT = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
-
- private static final String EMPTY_MAP_RESULT = "";
-
- private static final String MAP_RESULT_TWICE = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n" +
- "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
-
- private String textInput;
-
- private String emptyInput;
-
- private String resultDir;
-
- @Override
- protected void preSubmit() throws Exception {
- textInput = createTempFile("textdata.txt", MAP_IN);
- emptyInput = createTempFile("emptyfile.txt", "");
- resultDir = getTempDirPath("result");
- }
-
- public static class TestMapper extends MapFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private StringValue keyString = new StringValue();
- private StringValue valueString = new StringValue();
-
- @Override
- public void map(Record record, Collector<Record> out) throws Exception {
- keyString = record.getField(0, keyString);
- valueString = record.getField(1, valueString);
-
-
- if (Integer.parseInt(keyString.toString()) + Integer.parseInt(valueString.toString()) < 10) {
-
- record.setField(0, valueString);
- record.setField(1, new IntValue(Integer.parseInt(keyString.toString()) + 10));
-
- out.collect(record);
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- protected JobGraph getJobGraph() throws Exception {
-
- String path1 = config.getBoolean("input1PathHasData", false) ? textInput : emptyInput;
- String path2 = config.getBoolean("input2PathHasData", false) ? textInput : emptyInput;
-
- FileDataSource input1 = new FileDataSource(new ContractITCaseInputFormat(), path1);
- FileDataSource input2 = new FileDataSource(new ContractITCaseInputFormat(), path2);
-
- MapOperator testMapper1 = MapOperator.builder(new TestMapper()).build();
- MapOperator testMapper2 = MapOperator.builder(new TestMapper()).build();
-
- FileDataSink output = new FileDataSink(new ContractITCaseOutputFormat(), resultDir);
-
- testMapper1.setInput(input1);
- testMapper2.setInput(input2);
-
- output.addInput(testMapper1);
- output.addInput(testMapper2);
-
- Plan plan = new Plan(output);
- plan.setExecutionConfig(new ExecutionConfig());
- plan.setDefaultParallelism(parallelism);
-
- Optimizer pc = new Optimizer(new DataStatistics(), this.config);
- OptimizedPlan op = pc.compile(plan);
-
- JobGraphGenerator jgg = new JobGraphGenerator();
- return jgg.compileJobGraph(op);
- }
-
- @Override
- protected void postSubmit() throws Exception {
- String expectedResult = config.getString("UnionTest#ExpectedResult", null);
- if (expectedResult == null) {
- throw new Exception("Test corrupt, no expected return data set.");
- }
- compareResultsByLinesInMemory(expectedResult, resultDir);
- }
-
- @Parameters
- public static Collection<Object[]> getConfigurations() {
-
- //second input empty
- Configuration config1 = new Configuration();
- config1.setString("UnionTest#ExpectedResult", MAP_RESULT);
- config1.setBoolean("input1PathHasData", true);
- config1.setBoolean("input2PathHasData", false);
-
-
- //first input empty
- Configuration config2 = new Configuration();
- config2.setString("UnionTest#ExpectedResult", MAP_RESULT);
- config2.setBoolean("input1PathHasData", false);
- config2.setBoolean("input2PathHasData", true);
-
- //both inputs full
- Configuration config3 = new Configuration();
- config3.setString("UnionTest#ExpectedResult", MAP_RESULT_TWICE);
- config3.setBoolean("input1PathHasData", true);
- config3.setBoolean("input2PathHasData", true);
-
- //both inputs empty
- Configuration config4 = new Configuration();
- config4.setString("UnionTest#ExpectedResult", EMPTY_MAP_RESULT);
- config4.setBoolean("input1PathHasData", false);
- config4.setBoolean("input2PathHasData", false);
-
- return toParameterList(config1, config2, config3, config4);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/operators/io/ContractITCaseIOFormats.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/io/ContractITCaseIOFormats.java b/flink-tests/src/test/java/org/apache/flink/test/operators/io/ContractITCaseIOFormats.java
deleted file mode 100644
index 7649299..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/io/ContractITCaseIOFormats.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.operators.io;
-
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.core.fs.FileSystem.WriteMode;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-
-public class ContractITCaseIOFormats {
-
- private static final Logger LOG = LoggerFactory.getLogger(ContractITCaseIOFormats.class);
-
- public static class ContractITCaseInputFormat extends DelimitedInputFormat {
- private static final long serialVersionUID = 1L;
-
- private final StringValue keyString = new StringValue();
- private final StringValue valueString = new StringValue();
-
- @Override
- public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
- this.keyString.setValueAscii(bytes, offset, 1);
- this.valueString.setValueAscii(bytes, offset + 2, 1);
- target.setField(0, keyString);
- target.setField(1, valueString);
-
- if (LOG.isDebugEnabled())
- LOG.debug("Read in: [" + keyString.getValue() + "," + valueString.getValue() + "]");
-
- return target;
- }
- }
-
- public static class ContractITCaseOutputFormat extends FileOutputFormat {
- private static final long serialVersionUID = 1L;
-
- private final StringBuilder buffer = new StringBuilder();
- private final StringValue keyString = new StringValue();
- private final IntValue valueInteger = new IntValue();
-
-
- public ContractITCaseOutputFormat() {
- setWriteMode(WriteMode.OVERWRITE);
- }
-
- @Override
- public void writeRecord(Record record) throws IOException {
- this.buffer.setLength(0);
- this.buffer.append(record.getField(0, keyString).toString());
- this.buffer.append(' ');
- this.buffer.append(record.getField(1, valueInteger).getValue());
- this.buffer.append('\n');
-
- byte[] bytes = this.buffer.toString().getBytes();
-
- if (LOG.isDebugEnabled())
- LOG.debug("Writing out: [" + keyString.toString() + "," + valueInteger.getValue() + "]");
-
- this.stream.write(bytes);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/util/ContractITCaseIOFormats.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/ContractITCaseIOFormats.java b/flink-tests/src/test/java/org/apache/flink/test/util/ContractITCaseIOFormats.java
new file mode 100644
index 0000000..97aaaab
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/ContractITCaseIOFormats.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.test.util;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.java.record.io.DelimitedInputFormat;
+import org.apache.flink.api.java.record.io.FileOutputFormat;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+
+public class ContractITCaseIOFormats {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ContractITCaseIOFormats.class);
+
+ public static class ContractITCaseInputFormat extends DelimitedInputFormat {
+ private static final long serialVersionUID = 1L;
+
+ private final StringValue keyString = new StringValue();
+ private final StringValue valueString = new StringValue();
+
+ @Override
+ public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
+ this.keyString.setValueAscii(bytes, offset, 1);
+ this.valueString.setValueAscii(bytes, offset + 2, 1);
+ target.setField(0, keyString);
+ target.setField(1, valueString);
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Read in: [" + keyString.getValue() + "," + valueString.getValue() + "]");
+
+ return target;
+ }
+ }
+
+ public static class ContractITCaseOutputFormat extends FileOutputFormat {
+ private static final long serialVersionUID = 1L;
+
+ private final StringBuilder buffer = new StringBuilder();
+ private final StringValue keyString = new StringValue();
+ private final IntValue valueInteger = new IntValue();
+
+
+ public ContractITCaseOutputFormat() {
+ setWriteMode(WriteMode.OVERWRITE);
+ }
+
+ @Override
+ public void writeRecord(Record record) throws IOException {
+ this.buffer.setLength(0);
+ this.buffer.append(record.getField(0, keyString).toString());
+ this.buffer.append(' ');
+ this.buffer.append(record.getField(1, valueInteger).getValue());
+ this.buffer.append('\n');
+
+ byte[] bytes = this.buffer.toString().getBytes();
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Writing out: [" + keyString.toString() + "," + valueInteger.getValue() + "]");
+
+ this.stream.write(bytes);
+ }
+ }
+}