You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:06 UTC
[27/53] [abbrv] flink git commit: [optimizer] Rename optimizer
project to "flink-optimizer" (previously flink-compiler)
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java
deleted file mode 100644
index 8c19462..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java
+++ /dev/null
@@ -1,1436 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer;
-
-import static org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport.*;
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.operators.BinaryOperatorInformation;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.common.operators.OperatorInformation;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.optimizer.dag.DataSourceNode;
-import org.apache.flink.optimizer.dag.MapNode;
-import org.apache.flink.optimizer.dag.JoinNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SourcePlanNode;
-import org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
-import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
-import org.apache.flink.optimizer.testfunctions.IdentityMapper;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.junit.Test;
-import org.apache.flink.api.java.io.TextInputFormat;
-
-
-public class FeedbackPropertiesMatchTest {
-
- @Test
- public void testNoPartialSolutionFoundSingleInputOnly() {
- try {
- SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source");
-
- SourcePlanNode otherTarget = new SourcePlanNode(getSourceNode(), "Source");
-
- Channel toMap1 = new Channel(target);
- toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toMap1.setLocalStrategy(LocalStrategy.NONE);
- SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
-
- Channel toMap2 = new Channel(map1);
- toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toMap2.setLocalStrategy(LocalStrategy.NONE);
- SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
-
- {
- GlobalProperties gp = new GlobalProperties();
- LocalProperties lp = new LocalProperties();
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(otherTarget, gp, lp);
- assertTrue(report == NO_PARTIAL_SOLUTION);
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSingleInputOperators() {
- try {
- SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source");
-
- Channel toMap1 = new Channel(target);
- toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toMap1.setLocalStrategy(LocalStrategy.NONE);
- SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
-
- Channel toMap2 = new Channel(map1);
- toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toMap2.setLocalStrategy(LocalStrategy.NONE);
- SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
-
- // no feedback properties and none are ever required and present
- {
- GlobalProperties gp = new GlobalProperties();
- LocalProperties lp = new LocalProperties();
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // some global feedback properties and none are ever required and present
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(2, 5));
- LocalProperties lp = new LocalProperties();
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // some local feedback properties and none are ever required and present
- {
- GlobalProperties gp = new GlobalProperties();
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // some global and local feedback properties and none are ever required and present
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(2, 5));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // --------------------------- requirements on channel 1 -----------------------
-
- // some required global properties, which are matched exactly
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(2, 5));
- LocalProperties lp = new LocalProperties();
-
- RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
- reqGp.setHashPartitioned(new FieldList(2, 5));
-
- toMap1.setRequiredGlobalProps(reqGp);
- toMap1.setRequiredLocalProps(null);
-
- toMap2.setRequiredGlobalProps(null);
- toMap2.setRequiredLocalProps(null);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // some required local properties, which are matched exactly
- {
- GlobalProperties gp = new GlobalProperties();
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
-
- RequestedLocalProperties reqLp = new RequestedLocalProperties();
- reqLp.setGroupedFields(new FieldList(1, 2));
-
- toMap1.setRequiredGlobalProps(null);
- toMap1.setRequiredLocalProps(reqLp);
-
- toMap2.setRequiredGlobalProps(null);
- toMap2.setRequiredLocalProps(null);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // some required global and local properties, which are matched exactly
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(2, 5));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
-
- RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
- reqGp.setHashPartitioned(new FieldList(2, 5));
-
- RequestedLocalProperties reqLp = new RequestedLocalProperties();
- reqLp.setGroupedFields(new FieldList(1, 2));
-
- toMap1.setRequiredGlobalProps(reqGp);
- toMap1.setRequiredLocalProps(reqLp);
-
- toMap2.setRequiredGlobalProps(null);
- toMap2.setRequiredLocalProps(null);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // some required global and local properties, which are over-fulfilled
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(2));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
-
- RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
- reqGp.setHashPartitioned(new FieldSet(2, 5));
-
- RequestedLocalProperties reqLp = new RequestedLocalProperties();
- reqLp.setGroupedFields(new FieldList(1));
-
- toMap1.setRequiredGlobalProps(reqGp);
- toMap1.setRequiredLocalProps(reqLp);
-
- toMap2.setRequiredGlobalProps(null);
- toMap2.setRequiredLocalProps(null);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // some required global properties that are not met
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(2, 1));
- LocalProperties lp = new LocalProperties();
-
- RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
- reqGp.setHashPartitioned(new FieldList(2, 5));
-
- toMap1.setRequiredGlobalProps(reqGp);
- toMap1.setRequiredLocalProps(null);
-
- toMap2.setRequiredGlobalProps(null);
- toMap2.setRequiredLocalProps(null);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(NOT_MET, report);
- }
-
- // some required local properties that are not met
- {
- GlobalProperties gp = new GlobalProperties();
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
-
- RequestedLocalProperties reqLp = new RequestedLocalProperties();
- reqLp.setGroupedFields(new FieldList(2, 1));
-
- toMap1.setRequiredGlobalProps(null);
- toMap1.setRequiredLocalProps(reqLp);
-
- toMap2.setRequiredGlobalProps(null);
- toMap2.setRequiredLocalProps(null);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(NOT_MET, report);
- }
-
- // some required global and local properties where the global properties are not met
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(2, 1));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
-
- RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
- reqGp.setAnyPartitioning(new FieldList(2, 5));
-
- RequestedLocalProperties reqLp = new RequestedLocalProperties();
- reqLp.setGroupedFields(new FieldList(1));
-
- toMap1.setRequiredGlobalProps(reqGp);
- toMap1.setRequiredLocalProps(reqLp);
-
- toMap2.setRequiredGlobalProps(null);
- toMap2.setRequiredLocalProps(null);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(NOT_MET, report);
- }
-
- // some required global and local properties where the local properties are not met
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(1));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
-
- RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
- reqGp.setAnyPartitioning(new FieldList(1));
-
- RequestedLocalProperties reqLp = new RequestedLocalProperties();
- reqLp.setGroupedFields(new FieldList(2));
-
- toMap1.setRequiredGlobalProps(reqGp);
- toMap1.setRequiredLocalProps(reqLp);
-
- toMap2.setRequiredGlobalProps(null);
- toMap2.setRequiredLocalProps(null);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(NOT_MET, report);
- }
-
- // --------------------------- requirements on channel 2 -----------------------
-
- // some required global properties, which are matched exactly
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(2, 5));
- LocalProperties lp = new LocalProperties();
-
- RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
- reqGp.setHashPartitioned(new FieldList(2, 5));
-
- toMap1.setRequiredGlobalProps(null);
- toMap1.setRequiredLocalProps(null);
-
- toMap2.setRequiredGlobalProps(reqGp);
- toMap2.setRequiredLocalProps(null);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // some required local properties, which are matched exactly
- {
- GlobalProperties gp = new GlobalProperties();
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
-
- RequestedLocalProperties reqLp = new RequestedLocalProperties();
- reqLp.setGroupedFields(new FieldList(1, 2));
-
- toMap1.setRequiredGlobalProps(null);
- toMap1.setRequiredLocalProps(null);
-
- toMap2.setRequiredGlobalProps(null);
- toMap2.setRequiredLocalProps(reqLp);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // some required global and local properties, which are matched exactly
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(2, 5));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
-
- RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
- reqGp.setHashPartitioned(new FieldList(2, 5));
-
- RequestedLocalProperties reqLp = new RequestedLocalProperties();
- reqLp.setGroupedFields(new FieldList(1, 2));
-
- toMap1.setRequiredGlobalProps(null);
- toMap1.setRequiredLocalProps(null);
-
- toMap2.setRequiredGlobalProps(reqGp);
- toMap2.setRequiredLocalProps(reqLp);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // some required global and local properties, which are over-fulfilled
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(2));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
-
- RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
- reqGp.setHashPartitioned(new FieldSet(2, 5));
-
- RequestedLocalProperties reqLp = new RequestedLocalProperties();
- reqLp.setGroupedFields(new FieldList(1));
-
- toMap1.setRequiredGlobalProps(null);
- toMap1.setRequiredLocalProps(null);
-
- toMap2.setRequiredGlobalProps(reqGp);
- toMap2.setRequiredLocalProps(reqLp);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // some required global properties that are not met
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(2, 1));
- LocalProperties lp = new LocalProperties();
-
- RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
- reqGp.setHashPartitioned(new FieldSet(2, 5));
-
- toMap1.setRequiredGlobalProps(null);
- toMap1.setRequiredLocalProps(null);
-
- toMap2.setRequiredGlobalProps(reqGp);
- toMap2.setRequiredLocalProps(null);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(NOT_MET, report);
- }
-
- // some required local properties that are not met
- {
- GlobalProperties gp = new GlobalProperties();
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
-
- RequestedLocalProperties reqLp = new RequestedLocalProperties();
- reqLp.setGroupedFields(new FieldList(2, 1));
-
- toMap1.setRequiredGlobalProps(null);
- toMap1.setRequiredLocalProps(null);
-
- toMap2.setRequiredGlobalProps(null);
- toMap2.setRequiredLocalProps(reqLp);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(NOT_MET, report);
- }
-
- // some required global and local properties where the global properties are not met
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(2, 1));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
-
- RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
- reqGp.setAnyPartitioning(new FieldSet(2, 5));
-
- RequestedLocalProperties reqLp = new RequestedLocalProperties();
- reqLp.setGroupedFields(new FieldList(1));
-
- toMap1.setRequiredGlobalProps(null);
- toMap1.setRequiredLocalProps(null);
-
- toMap2.setRequiredGlobalProps(reqGp);
- toMap2.setRequiredLocalProps(reqLp);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(NOT_MET, report);
- }
-
- // some required global and local properties where the local properties are not met
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(1));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
-
- RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
- reqGp.setAnyPartitioning(new FieldList(1));
-
- RequestedLocalProperties reqLp = new RequestedLocalProperties();
- reqLp.setGroupedFields(new FieldList(2));
-
- toMap1.setRequiredGlobalProps(null);
- toMap1.setRequiredLocalProps(null);
-
- toMap2.setRequiredGlobalProps(reqGp);
- toMap2.setRequiredLocalProps(reqLp);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(NOT_MET, report);
- }
-
- // ---------------------- requirements mixed on 1 and 2 -----------------------
-
- // some required global properties at step one and some more at step 2
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(1, 2));
- LocalProperties lp = LocalProperties.EMPTY;
-
- RequestedGlobalProperties reqGp1 = new RequestedGlobalProperties();
- reqGp1.setAnyPartitioning(new FieldList(1, 2));
-
- RequestedGlobalProperties reqGp2 = new RequestedGlobalProperties();
- reqGp2.setHashPartitioned(new FieldList(1, 2));
-
- toMap1.setRequiredGlobalProps(reqGp1);
- toMap1.setRequiredLocalProps(null);
-
- toMap2.setRequiredGlobalProps(reqGp2);
- toMap2.setRequiredLocalProps(null);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // some required local properties at step one and some more at step 2
- {
- GlobalProperties gp = new GlobalProperties();
- LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
-
- RequestedLocalProperties reqLp1 = new RequestedLocalProperties();
- reqLp1.setGroupedFields(new FieldList(3, 1));
-
- RequestedLocalProperties reqLp2 = new RequestedLocalProperties();
- reqLp2.setOrdering(new Ordering(3, null, Order.ANY).appendOrdering(1, null, Order.ANY));
-
- toMap1.setRequiredGlobalProps(null);
- toMap1.setRequiredLocalProps(reqLp1);
-
- toMap2.setRequiredGlobalProps(null);
- toMap2.setRequiredLocalProps(reqLp2);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // some required global properties at step one and some local ones at step 2
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(1, 2));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-
- RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
- reqGp.setAnyPartitioning(new FieldList(1, 2));
-
- RequestedLocalProperties reqLp = new RequestedLocalProperties();
- reqLp.setGroupedFields(new FieldList(2));
-
- toMap1.setRequiredGlobalProps(reqGp);
- toMap1.setRequiredLocalProps(null);
-
- toMap2.setRequiredGlobalProps(null);
- toMap2.setRequiredLocalProps(reqLp);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // some required local properties at step one and some global ones at step 2
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(1, 2));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-
- RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
- reqGp.setAnyPartitioning(new FieldList(1, 2));
-
- RequestedLocalProperties reqLp = new RequestedLocalProperties();
- reqLp.setGroupedFields(new FieldList(2));
-
- toMap1.setRequiredGlobalProps(null);
- toMap1.setRequiredLocalProps(reqLp);
-
- toMap2.setRequiredGlobalProps(reqGp);
- toMap2.setRequiredLocalProps(null);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // some fulfilled global properties at step one and some non-fulfilled local ones at step 2
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(1, 2));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-
- RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
- reqGp.setAnyPartitioning(new FieldList(1, 2));
-
- RequestedLocalProperties reqLp = new RequestedLocalProperties();
- reqLp.setGroupedFields(new FieldList(2, 3));
-
- toMap1.setRequiredGlobalProps(reqGp);
- toMap1.setRequiredLocalProps(null);
-
- toMap2.setRequiredGlobalProps(null);
- toMap2.setRequiredLocalProps(reqLp);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(NOT_MET, report);
- }
-
- // some fulfilled local properties at step one and some non-fulfilled global ones at step 2
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(1, 2));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-
- RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
- reqGp.setAnyPartitioning(new FieldList(2, 3));
-
- RequestedLocalProperties reqLp = new RequestedLocalProperties();
- reqLp.setGroupedFields(new FieldList(2, 1));
-
- toMap1.setRequiredGlobalProps(null);
- toMap1.setRequiredLocalProps(reqLp);
-
- toMap2.setRequiredGlobalProps(reqGp);
- toMap2.setRequiredLocalProps(null);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(NOT_MET, report);
- }
-
- // some non-fulfilled global properties at step one and some fulfilled local ones at step 2
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(1, 2));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-
- RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
- reqGp.setAnyPartitioning(new FieldList(2, 3));
-
- RequestedLocalProperties reqLp = new RequestedLocalProperties();
- reqLp.setGroupedFields(new FieldList(2, 1));
-
- toMap1.setRequiredGlobalProps(reqGp);
- toMap1.setRequiredLocalProps(null);
-
- toMap2.setRequiredGlobalProps(null);
- toMap2.setRequiredLocalProps(reqLp);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(NOT_MET, report);
- }
-
- // some non-fulfilled local properties at step one and some fulfilled global ones at step 2
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(1, 2));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-
- RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
- reqGp.setAnyPartitioning(new FieldList(1, 2));
-
- RequestedLocalProperties reqLp = new RequestedLocalProperties();
- reqLp.setGroupedFields(new FieldList(2, 1, 3));
-
- toMap1.setRequiredGlobalProps(null);
- toMap1.setRequiredLocalProps(reqLp);
-
- toMap2.setRequiredGlobalProps(reqGp);
- toMap2.setRequiredLocalProps(null);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(NOT_MET, report);
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSingleInputOperatorsWithReCreation() {
- try {
- SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source");
-
- Channel toMap1 = new Channel(target);
- SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
-
- Channel toMap2 = new Channel(map1);
- SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
-
- // set ship strategy in first channel, so later non matching global properties do not matter
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(1, 2));
- LocalProperties lp = LocalProperties.EMPTY;
-
- RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
- reqGp.setAnyPartitioning(new FieldSet(2, 5));
-
- toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 5), DataExchangeMode.PIPELINED);
- toMap1.setLocalStrategy(LocalStrategy.NONE);
-
- toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toMap2.setLocalStrategy(LocalStrategy.NONE);
-
-
- toMap1.setRequiredGlobalProps(null);
- toMap1.setRequiredLocalProps(null);
-
- toMap2.setRequiredGlobalProps(reqGp);
- toMap2.setRequiredLocalProps(null);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(MET, report);
- }
-
- // set ship strategy in second channel, so previous non matching global properties void the match
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(1, 2));
- LocalProperties lp = LocalProperties.EMPTY;
-
- RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
- reqGp.setAnyPartitioning(new FieldSet(2, 5));
-
- toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toMap1.setLocalStrategy(LocalStrategy.NONE);
-
- toMap2.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 5), DataExchangeMode.PIPELINED);
- toMap2.setLocalStrategy(LocalStrategy.NONE);
-
-
- toMap1.setRequiredGlobalProps(reqGp);
- toMap1.setRequiredLocalProps(null);
-
- toMap2.setRequiredGlobalProps(null);
- toMap2.setRequiredLocalProps(null);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(NOT_MET, report);
- }
-
- // set local strategy in first channel, so later non matching local properties do not matter
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(1, 2));
- LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
-
- RequestedLocalProperties reqLp = new RequestedLocalProperties();
- reqLp.setGroupedFields(new FieldList(4, 1));
-
- toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toMap1.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
-
- toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toMap2.setLocalStrategy(LocalStrategy.NONE);
-
- toMap1.setRequiredGlobalProps(null);
- toMap1.setRequiredLocalProps(null);
-
- toMap2.setRequiredGlobalProps(null);
- toMap2.setRequiredLocalProps(reqLp);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // set local strategy in second channel, so previous non matching local properties void the match
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(1, 2));
- LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
-
- RequestedLocalProperties reqLp = new RequestedLocalProperties();
- reqLp.setGroupedFields(new FieldList(4, 1));
-
- toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toMap1.setLocalStrategy(LocalStrategy.NONE);
-
- toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toMap2.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
-
-
- toMap1.setRequiredGlobalProps(null);
- toMap1.setRequiredLocalProps(reqLp);
-
- toMap2.setRequiredGlobalProps(null);
- toMap2.setRequiredLocalProps(null);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(NOT_MET, report);
- }
-
- // create the properties on the same node as the requirement
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(1, 2));
- LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
-
- RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
- reqGp.setAnyPartitioning(new FieldSet(5, 7));
-
- RequestedLocalProperties reqLp = new RequestedLocalProperties();
- reqLp.setGroupedFields(new FieldList(5, 7));
-
- toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(5, 7), DataExchangeMode.PIPELINED);
- toMap1.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
-
- toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toMap2.setLocalStrategy(LocalStrategy.NONE);
-
- toMap1.setRequiredGlobalProps(reqGp);
- toMap1.setRequiredLocalProps(reqLp);
-
- toMap2.setRequiredGlobalProps(null);
- toMap2.setRequiredLocalProps(null);
-
- FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(MET, report);
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSingleInputOperatorsChainOfThree() {
- try {
- SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source");
-
- Channel toMap1 = new Channel(target);
- SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
-
- Channel toMap2 = new Channel(map1);
- SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
-
- Channel toMap3 = new Channel(map2);
- SingleInputPlanNode map3 = new SingleInputPlanNode(getMapNode(), "Mapper 3", toMap3, DriverStrategy.MAP);
-
- // set local strategy in first channel, so later non matching local properties do not matter
- {
- GlobalProperties gp = new GlobalProperties();
- LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
-
- RequestedLocalProperties reqLp = new RequestedLocalProperties();
- reqLp.setGroupedFields(new FieldList(4, 1));
-
- toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toMap1.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
-
- toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toMap2.setLocalStrategy(LocalStrategy.NONE);
-
- toMap3.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toMap3.setLocalStrategy(LocalStrategy.NONE);
-
- toMap1.setRequiredGlobalProps(null);
- toMap1.setRequiredLocalProps(null);
-
- toMap2.setRequiredGlobalProps(null);
- toMap2.setRequiredLocalProps(null);
-
- toMap3.setRequiredGlobalProps(null);
- toMap3.setRequiredLocalProps(reqLp);
-
- FeedbackPropertiesMeetRequirementsReport report = map3.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // set global strategy in first channel, so later non matching global properties do not matter
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(5, 3));
- LocalProperties lp = LocalProperties.EMPTY;
-
- RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
- reqGp.setAnyPartitioning(new FieldSet(2, 3));
-
- toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(1, 2), DataExchangeMode.PIPELINED);
- toMap1.setLocalStrategy(LocalStrategy.NONE);
-
- toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toMap2.setLocalStrategy(LocalStrategy.NONE);
-
- toMap3.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toMap3.setLocalStrategy(LocalStrategy.NONE);
-
- toMap1.setRequiredGlobalProps(null);
- toMap1.setRequiredLocalProps(null);
-
- toMap2.setRequiredGlobalProps(null);
- toMap2.setRequiredLocalProps(null);
-
- toMap3.setRequiredGlobalProps(reqGp);
- toMap3.setRequiredLocalProps(null);
-
- FeedbackPropertiesMeetRequirementsReport report = map3.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testNoPartialSolutionFoundTwoInputOperator() {
- try {
- SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Partial Solution");
-
- SourcePlanNode source1 = new SourcePlanNode(getSourceNode(), "Source 1");
- SourcePlanNode source2 = new SourcePlanNode(getSourceNode(), "Source 2");
-
- Channel toMap1 = new Channel(source1);
- toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toMap1.setLocalStrategy(LocalStrategy.NONE);
- SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
-
- Channel toMap2 = new Channel(source2);
- toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toMap2.setLocalStrategy(LocalStrategy.NONE);
- SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
-
- Channel toJoin1 = new Channel(map1);
- Channel toJoin2 = new Channel(map2);
-
- toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toJoin1.setLocalStrategy(LocalStrategy.NONE);
- toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toJoin2.setLocalStrategy(LocalStrategy.NONE);
-
- DualInputPlanNode join = new DualInputPlanNode(getJoinNode(), "Join", toJoin1, toJoin2, DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-
- FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, new GlobalProperties(), new LocalProperties());
- assertEquals(NO_PARTIAL_SOLUTION, report);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testTwoOperatorsOneIndependent() {
- try {
- SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Partial Solution");
- SourcePlanNode source = new SourcePlanNode(getSourceNode(), "Other Source");
-
- Channel toMap1 = new Channel(target);
- toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toMap1.setLocalStrategy(LocalStrategy.NONE);
- SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
-
- Channel toMap2 = new Channel(source);
- toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toMap2.setLocalStrategy(LocalStrategy.NONE);
- SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
-
- Channel toJoin1 = new Channel(map1);
- Channel toJoin2 = new Channel(map2);
-
- DualInputPlanNode join = new DualInputPlanNode(getJoinNode(), "Join", toJoin1, toJoin2, DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-
- Channel toAfterJoin = new Channel(join);
- toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
- SingleInputPlanNode afterJoin = new SingleInputPlanNode(getMapNode(), "After Join Mapper", toAfterJoin, DriverStrategy.MAP);
-
- // attach some properties to the non-relevant input
- {
- toMap2.setShipStrategy(ShipStrategyType.BROADCAST, DataExchangeMode.PIPELINED);
- toMap2.setLocalStrategy(LocalStrategy.SORT, new FieldList(2, 7), new boolean[] {true, true});
-
- RequestedGlobalProperties joinGp = new RequestedGlobalProperties();
- joinGp.setFullyReplicated();
-
- RequestedLocalProperties joinLp = new RequestedLocalProperties();
- joinLp.setOrdering(new Ordering(2, null, Order.ASCENDING).appendOrdering(7, null, Order.ASCENDING));
-
- toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toJoin2.setLocalStrategy(LocalStrategy.NONE);
- toJoin2.setRequiredGlobalProps(joinGp);
- toJoin2.setRequiredLocalProps(joinLp);
- }
-
- // ------------------------------------------------------------------------------------
-
- // no properties from the partial solution, no required properties
- {
- toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toJoin1.setLocalStrategy(LocalStrategy.NONE);
-
- GlobalProperties gp = new GlobalProperties();
- LocalProperties lp = LocalProperties.EMPTY;
-
- FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // some properties from the partial solution, no required properties
- {
- toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toJoin1.setLocalStrategy(LocalStrategy.NONE);
-
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(0));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-
- FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
-
- // produced properties match relevant input
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(0));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-
- RequestedGlobalProperties rgp = new RequestedGlobalProperties();
- rgp.setHashPartitioned(new FieldList(0));
-
- RequestedLocalProperties rlp = new RequestedLocalProperties();
- rlp.setGroupedFields(new FieldList(2));
-
- toJoin1.setRequiredGlobalProps(rgp);
- toJoin1.setRequiredLocalProps(rlp);
-
- toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toJoin1.setLocalStrategy(LocalStrategy.NONE);
-
- FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // produced properties do not match relevant input
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(0));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-
- RequestedGlobalProperties rgp = new RequestedGlobalProperties();
- rgp.setHashPartitioned(new FieldList(0));
-
- RequestedLocalProperties rlp = new RequestedLocalProperties();
- rlp.setGroupedFields(new FieldList(1, 2, 3));
-
- toJoin1.setRequiredGlobalProps(rgp);
- toJoin1.setRequiredLocalProps(rlp);
-
- toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toJoin1.setLocalStrategy(LocalStrategy.NONE);
-
- FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(NOT_MET, report);
- }
-
- // produced properties overridden before join
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(0));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-
- RequestedGlobalProperties rgp = new RequestedGlobalProperties();
- rgp.setHashPartitioned(new FieldList(0));
-
- RequestedLocalProperties rlp = new RequestedLocalProperties();
- rlp.setGroupedFields(new FieldList(2, 1));
-
- toMap1.setRequiredGlobalProps(rgp);
- toMap1.setRequiredLocalProps(rlp);
-
- toJoin1.setRequiredGlobalProps(null);
- toJoin1.setRequiredLocalProps(null);
-
- toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 1), DataExchangeMode.PIPELINED);
- toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
-
- FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(MET, report);
- }
-
- // produced properties before join match, after join match as well
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(0));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-
- RequestedGlobalProperties rgp = new RequestedGlobalProperties();
- rgp.setHashPartitioned(new FieldList(0));
-
- RequestedLocalProperties rlp = new RequestedLocalProperties();
- rlp.setGroupedFields(new FieldList(2, 1));
-
- toMap1.setRequiredGlobalProps(null);
- toMap1.setRequiredLocalProps(null);
-
- toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toJoin1.setLocalStrategy(LocalStrategy.NONE);
-
- toJoin1.setRequiredGlobalProps(rgp);
- toJoin1.setRequiredLocalProps(rlp);
-
- toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
-
- toAfterJoin.setRequiredGlobalProps(rgp);
- toAfterJoin.setRequiredLocalProps(rlp);
-
- FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // produced properties before join match, after join do not match
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(0));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-
- RequestedGlobalProperties rgp1 = new RequestedGlobalProperties();
- rgp1.setHashPartitioned(new FieldList(0));
-
- RequestedGlobalProperties rgp2 = new RequestedGlobalProperties();
- rgp2.setHashPartitioned(new FieldList(3));
-
- RequestedLocalProperties rlp1 = new RequestedLocalProperties();
- rlp1.setGroupedFields(new FieldList(2, 1));
-
- RequestedLocalProperties rlp2 = new RequestedLocalProperties();
- rlp2.setGroupedFields(new FieldList(3, 4));
-
- toJoin1.setRequiredGlobalProps(rgp1);
- toJoin1.setRequiredLocalProps(rlp1);
-
- toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
-
- toAfterJoin.setRequiredGlobalProps(rgp2);
- toAfterJoin.setRequiredLocalProps(rlp2);
-
- FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(NOT_MET, report);
- }
-
- // produced properties are overridden, does not matter that they do not match
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setAnyPartitioning(new FieldList(0));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-
- RequestedGlobalProperties rgp = new RequestedGlobalProperties();
- rgp.setHashPartitioned(new FieldList(1));
-
- RequestedLocalProperties rlp = new RequestedLocalProperties();
- rlp.setGroupedFields(new FieldList(1, 2, 3));
-
- toJoin1.setRequiredGlobalProps(null);
- toJoin1.setRequiredLocalProps(null);
-
- toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 1), DataExchangeMode.PIPELINED);
- toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
-
- toAfterJoin.setRequiredGlobalProps(rgp);
- toAfterJoin.setRequiredLocalProps(rlp);
-
- FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(MET, report);
- }
-
- // local property overridden before join, local property mismatch after join not relevant
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setAnyPartitioning(new FieldList(0));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-
- RequestedLocalProperties rlp = new RequestedLocalProperties();
- rlp.setGroupedFields(new FieldList(1, 2, 3));
-
- toJoin1.setRequiredGlobalProps(null);
- toJoin1.setRequiredLocalProps(null);
-
- toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
-
- toAfterJoin.setRequiredGlobalProps(null);
- toAfterJoin.setRequiredLocalProps(rlp);
-
- FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // local property overridden before join, global property mismatch after join void the match
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setAnyPartitioning(new FieldList(0));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-
- RequestedGlobalProperties rgp = new RequestedGlobalProperties();
- rgp.setHashPartitioned(new FieldList(1));
-
- RequestedLocalProperties rlp = new RequestedLocalProperties();
- rlp.setGroupedFields(new FieldList(1, 2, 3));
-
- toJoin1.setRequiredGlobalProps(null);
- toJoin1.setRequiredLocalProps(null);
-
- toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
-
- toAfterJoin.setRequiredGlobalProps(rgp);
- toAfterJoin.setRequiredLocalProps(rlp);
-
- FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(NOT_MET, report);
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testTwoOperatorsBothDependent() {
- try {
- SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Partial Solution");
-
- Channel toMap1 = new Channel(target);
- toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toMap1.setLocalStrategy(LocalStrategy.NONE);
- SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
-
- Channel toMap2 = new Channel(target);
- toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toMap2.setLocalStrategy(LocalStrategy.NONE);
- SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
-
- Channel toJoin1 = new Channel(map1);
- toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toJoin1.setLocalStrategy(LocalStrategy.NONE);
-
- Channel toJoin2 = new Channel(map2);
- toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toJoin2.setLocalStrategy(LocalStrategy.NONE);
-
- DualInputPlanNode join = new DualInputPlanNode(getJoinNode(), "Join", toJoin1, toJoin2, DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-
- Channel toAfterJoin = new Channel(join);
- toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
- SingleInputPlanNode afterJoin = new SingleInputPlanNode(getMapNode(), "After Join Mapper", toAfterJoin, DriverStrategy.MAP);
-
- // no properties from the partial solution, no required properties
- {
- GlobalProperties gp = new GlobalProperties();
- LocalProperties lp = LocalProperties.EMPTY;
-
- FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // some properties from the partial solution, no required properties
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(0));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-
- FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // test requirements on one input and met
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(0));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-
- RequestedGlobalProperties rgp = new RequestedGlobalProperties();
- rgp.setHashPartitioned(new FieldList(0));
-
- RequestedLocalProperties rlp = new RequestedLocalProperties();
- rlp.setGroupedFields(new FieldList(2, 1));
-
- toJoin1.setRequiredGlobalProps(rgp);
- toJoin1.setRequiredLocalProps(rlp);
-
- FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // test requirements on both input and met
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(0));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-
- RequestedGlobalProperties rgp = new RequestedGlobalProperties();
- rgp.setHashPartitioned(new FieldList(0));
-
- RequestedLocalProperties rlp = new RequestedLocalProperties();
- rlp.setGroupedFields(new FieldList(2, 1));
-
- toJoin1.setRequiredGlobalProps(rgp);
- toJoin1.setRequiredLocalProps(rlp);
-
- toJoin2.setRequiredGlobalProps(rgp);
- toJoin2.setRequiredLocalProps(rlp);
-
- FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
- }
-
- // test requirements on both inputs, one not met
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(0));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-
- RequestedGlobalProperties rgp1 = new RequestedGlobalProperties();
- rgp1.setHashPartitioned(new FieldList(0));
-
- RequestedLocalProperties rlp1 = new RequestedLocalProperties();
- rlp1.setGroupedFields(new FieldList(2, 1));
-
- RequestedGlobalProperties rgp2 = new RequestedGlobalProperties();
- rgp2.setHashPartitioned(new FieldList(1));
-
- RequestedLocalProperties rlp2 = new RequestedLocalProperties();
- rlp2.setGroupedFields(new FieldList(0, 3));
-
- toJoin1.setRequiredGlobalProps(rgp1);
- toJoin1.setRequiredLocalProps(rlp1);
-
- toJoin2.setRequiredGlobalProps(rgp2);
- toJoin2.setRequiredLocalProps(rlp2);
-
- FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(NOT_MET, report);
- }
-
- // test override on both inputs, later requirement ignored
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(0));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-
- RequestedGlobalProperties rgp = new RequestedGlobalProperties();
- rgp.setHashPartitioned(new FieldList(1));
-
- RequestedLocalProperties rlp = new RequestedLocalProperties();
- rlp.setGroupedFields(new FieldList(0, 3));
-
- toJoin1.setRequiredGlobalProps(null);
- toJoin1.setRequiredLocalProps(null);
-
- toJoin2.setRequiredGlobalProps(null);
- toJoin2.setRequiredLocalProps(null);
-
- toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88), DataExchangeMode.PIPELINED);
- toJoin2.setShipStrategy(ShipStrategyType.BROADCAST, DataExchangeMode.PIPELINED);
-
- toAfterJoin.setRequiredGlobalProps(rgp);
- toAfterJoin.setRequiredLocalProps(rlp);
-
- FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(MET, report);
- }
-
- // test override on one inputs, later requirement met
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(0));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-
- RequestedGlobalProperties rgp = new RequestedGlobalProperties();
- rgp.setHashPartitioned(new FieldList(0));
-
- RequestedLocalProperties rlp = new RequestedLocalProperties();
- rlp.setGroupedFields(new FieldList(2, 1));
-
- toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88), DataExchangeMode.PIPELINED);
- toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-
- toAfterJoin.setRequiredGlobalProps(rgp);
- toAfterJoin.setRequiredLocalProps(rlp);
-
- FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(PENDING, report);
- }
-
- // test override on one input, later requirement not met
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(0));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-
- RequestedGlobalProperties rgp = new RequestedGlobalProperties();
- rgp.setHashPartitioned(new FieldList(3));
-
- RequestedLocalProperties rlp = new RequestedLocalProperties();
- rlp.setGroupedFields(new FieldList(77, 69));
-
- toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88), DataExchangeMode.PIPELINED);
- toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-
- toAfterJoin.setRequiredGlobalProps(rgp);
- toAfterJoin.setRequiredLocalProps(rlp);
-
- FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(NOT_MET, report);
- }
-
- // test override on one input locally, later global requirement not met
- {
- GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(new FieldList(0));
- LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
-
- RequestedGlobalProperties rgp = new RequestedGlobalProperties();
- rgp.setHashPartitioned(new FieldList(3));
-
-
- toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(3), new boolean[] { false });
-
- toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
- toJoin1.setLocalStrategy(LocalStrategy.NONE);
-
- toAfterJoin.setRequiredGlobalProps(rgp);
- toAfterJoin.setRequiredLocalProps(null);
-
- FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
- assertEquals(NOT_MET, report);
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- private static DataSourceNode getSourceNode() {
- return new DataSourceNode(new GenericDataSourceBase<String, TextInputFormat>(new TextInputFormat(new Path("/")), new OperatorInformation<String>(BasicTypeInfo.STRING_TYPE_INFO)));
- }
-
- private static MapNode getMapNode() {
- return new MapNode(new MapOperatorBase<String, String, MapFunction<String,String>>(new IdentityMapper<String>(), new UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), "map op"));
- }
-
- private static JoinNode getJoinNode() {
- return new JoinNode(new JoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>>(new DummyFlatJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
deleted file mode 100644
index 77d185d..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.util.DummyCoGroupStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityReduce;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.StringValue;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * This test case has been created to validate that correct strategies are used if orders within groups are
- * requested.
- */
-@SuppressWarnings({"serial", "deprecation"})
-public class GroupOrderTest extends CompilerTestBase {
-
- @Test
- public void testReduceWithGroupOrder() {
- // construct the plan
- FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-
- ReduceOperator reduce = ReduceOperator.builder(new IdentityReduce()).keyField(IntValue.class, 2).name("Reduce").input(source).build();
- Ordering groupOrder = new Ordering(5, StringValue.class, Order.DESCENDING);
- reduce.setGroupOrder(groupOrder);
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, reduce, "Sink");
-
-
- Plan plan = new Plan(sink, "Test Temp Task");
- plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-
- OptimizedPlan oPlan;
- try {
- oPlan = compileNoStats(plan);
- } catch(CompilerException ce) {
- ce.printStackTrace();
- fail("The pact compiler is unable to compile this plan correctly.");
- return; // silence the compiler
- }
-
- OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
- SinkPlanNode sinkNode = resolver.getNode("Sink");
- SingleInputPlanNode reducer = resolver.getNode("Reduce");
-
- // verify the strategies
- Assert.assertEquals(ShipStrategyType.FORWARD, sinkNode.getInput().getShipStrategy());
- Assert.assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
-
- Channel c = reducer.getInput();
- Assert.assertEquals(LocalStrategy.SORT, c.getLocalStrategy());
-
- FieldList ship = new FieldList(2);
- FieldList local = new FieldList(2, 5);
- Assert.assertEquals(ship, c.getShipStrategyKeys());
- Assert.assertEquals(local, c.getLocalStrategyKeys());
- Assert.assertTrue(c.getLocalStrategySortOrder()[0] == reducer.getSortOrders(0)[0]);
-
- // check that we indeed sort descending
- Assert.assertTrue(c.getLocalStrategySortOrder()[1] == groupOrder.getFieldSortDirections()[0]);
- }
-
- @Test
- public void testCoGroupWithGroupOrder() {
- // construct the plan
- FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source1");
- FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source2");
-
- CoGroupOperator coGroup = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 3, 6)
- .keyField(LongValue.class, 0, 0)
- .name("CoGroup").input1(source1).input2(source2).build();
-
- Ordering groupOrder1 = new Ordering(5, StringValue.class, Order.DESCENDING);
- Ordering groupOrder2 = new Ordering(1, StringValue.class, Order.DESCENDING);
- groupOrder2.appendOrdering(4, DoubleValue.class, Order.ASCENDING);
- coGroup.setGroupOrderForInputOne(groupOrder1);
- coGroup.setGroupOrderForInputTwo(groupOrder2);
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, coGroup, "Sink");
-
- Plan plan = new Plan(sink, "Reduce Group Order Test");
- plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-
- OptimizedPlan oPlan;
- try {
- oPlan = compileNoStats(plan);
- } catch(CompilerException ce) {
- ce.printStackTrace();
- fail("The pact compiler is unable to compile this plan correctly.");
- return; // silence the compiler
- }
-
- OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
- SinkPlanNode sinkNode = resolver.getNode("Sink");
- DualInputPlanNode coGroupNode = resolver.getNode("CoGroup");
-
- // verify the strategies
- Assert.assertEquals(ShipStrategyType.FORWARD, sinkNode.getInput().getShipStrategy());
- Assert.assertEquals(ShipStrategyType.PARTITION_HASH, coGroupNode.getInput1().getShipStrategy());
- Assert.assertEquals(ShipStrategyType.PARTITION_HASH, coGroupNode.getInput2().getShipStrategy());
-
- Channel c1 = coGroupNode.getInput1();
- Channel c2 = coGroupNode.getInput2();
-
- Assert.assertEquals(LocalStrategy.SORT, c1.getLocalStrategy());
- Assert.assertEquals(LocalStrategy.SORT, c2.getLocalStrategy());
-
- FieldList ship1 = new FieldList(new int[] {3, 0});
- FieldList ship2 = new FieldList(new int[] {6, 0});
-
- FieldList local1 = new FieldList(new int[] {3, 0, 5});
- FieldList local2 = new FieldList(new int[] {6, 0, 1, 4});
-
- Assert.assertEquals(ship1, c1.getShipStrategyKeys());
- Assert.assertEquals(ship2, c2.getShipStrategyKeys());
- Assert.assertEquals(local1, c1.getLocalStrategyKeys());
- Assert.assertEquals(local2, c2.getLocalStrategyKeys());
-
- Assert.assertTrue(c1.getLocalStrategySortOrder()[0] == coGroupNode.getSortOrders()[0]);
- Assert.assertTrue(c1.getLocalStrategySortOrder()[1] == coGroupNode.getSortOrders()[1]);
- Assert.assertTrue(c2.getLocalStrategySortOrder()[0] == coGroupNode.getSortOrders()[0]);
- Assert.assertTrue(c2.getLocalStrategySortOrder()[1] == coGroupNode.getSortOrders()[1]);
-
- // check that the local group orderings are correct
- Assert.assertTrue(c1.getLocalStrategySortOrder()[2] == groupOrder1.getFieldSortDirections()[0]);
- Assert.assertTrue(c2.getLocalStrategySortOrder()[2] == groupOrder2.getFieldSortDirections()[0]);
- Assert.assertTrue(c2.getLocalStrategySortOrder()[3] == groupOrder2.getFieldSortDirections()[1]);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
deleted file mode 100644
index 6dadc19..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.CrossOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyCrossStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
-import org.apache.flink.types.IntValue;
-import org.junit.Test;
-
-/**
- * This class tests plans that once failed because of a bug:
- * <ul>
- * <li> Ticket 158
- * </ul>
- */
-@SuppressWarnings({"serial", "deprecation"})
-public class HardPlansCompilationTest extends CompilerTestBase {
-
- /**
- * Source -> Map -> Reduce -> Cross -> Reduce -> Cross -> Reduce ->
- * |--------------------------/ /
- * |--------------------------------------------/
- *
- * First cross has SameKeyFirst output contract
- */
- @Test
- public void testTicket158() {
- // construct the plan
- FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-
- MapOperator map = MapOperator.builder(new IdentityMap()).name("Map1").input(source).build();
-
- ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce1").input(map).build();
-
- CrossOperator cross1 = CrossOperator.builder(new DummyCrossStub()).name("Cross1").input1(reduce1).input2(source).build();
-
- ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce2").input(cross1).build();
-
- CrossOperator cross2 = CrossOperator.builder(new DummyCrossStub()).name("Cross2").input1(reduce2).input2(source).build();
-
- ReduceOperator reduce3 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce3").input(cross2).build();
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
- sink.setInput(reduce3);
-
- Plan plan = new Plan(sink, "Test Temp Task");
- plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-
- OptimizedPlan oPlan = compileNoStats(plan);
- JobGraphGenerator jobGen = new JobGraphGenerator();
- jobGen.compileJobGraph(oPlan);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
deleted file mode 100644
index ac4f820..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
+++ /dev/null
@@ -1,409 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.junit.Test;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
-import org.apache.flink.optimizer.testfunctions.IdentityMapper;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.util.Collector;
-
-
-@SuppressWarnings({"serial", "unchecked"})
-public class IterationsCompilerTest extends CompilerTestBase {
-
- @Test
- public void testSolutionSetDeltaDependsOnBroadcastVariable() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple2<Long, Long>> source =
- env.generateSequence(1, 1000).map(new DuplicateValueScalar<Long>());
-
- DataSet<Tuple2<Long, Long>> invariantInput =
- env.generateSequence(1, 1000).map(new DuplicateValueScalar<Long>());
-
- // iteration from here
- DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = source.iterateDelta(source, 1000, 1);
-
- DataSet<Tuple2<Long, Long>> result =
- invariantInput
- .map(new IdentityMapper<Tuple2<Long, Long>>()).withBroadcastSet(iter.getWorkset(), "bc data")
- .join(iter.getSolutionSet()).where(0).equalTo(1).projectFirst(1).projectSecond(1);
-
- iter.closeWith(result.map(new IdentityMapper<Tuple2<Long,Long>>()), result).print();
-
- OptimizedPlan p = compileNoStats(env.createProgramPlan());
-
- // check that the JSON generator accepts this plan
- new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(p);
-
- // check that the JobGraphGenerator accepts the plan
- new JobGraphGenerator().compileJobGraph(p);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testTwoIterationsWithMapperInbetween() throws Exception {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
-
- DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-
- DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-
- DataSet<Tuple2<Long, Long>> bulkResult = doBulkIteration(verticesWithInitialId, edges);
-
- DataSet<Tuple2<Long, Long>> mappedBulk = bulkResult.map(new DummyMap());
-
- DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(mappedBulk, edges);
-
- depResult.print();
-
- Plan p = env.createProgramPlan();
- OptimizedPlan op = compileNoStats(p);
-
- assertEquals(1, op.getDataSinks().size());
- assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
-
- WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
-
- assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
- assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
-
- new JobGraphGenerator().compileJobGraph(op);
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testTwoIterationsDirectlyChained() throws Exception {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
-
- DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-
- DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-
- DataSet<Tuple2<Long, Long>> bulkResult = doBulkIteration(verticesWithInitialId, edges);
-
- DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(bulkResult, edges);
-
- depResult.print();
-
- Plan p = env.createProgramPlan();
- OptimizedPlan op = compileNoStats(p);
-
- assertEquals(1, op.getDataSinks().size());
- assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
-
- WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
-
- assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
- assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
-
- new JobGraphGenerator().compileJobGraph(op);
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testTwoWorksetIterationsDirectlyChained() throws Exception {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
-
- DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-
- DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-
- DataSet<Tuple2<Long, Long>> firstResult = doDeltaIteration(verticesWithInitialId, edges);
-
- DataSet<Tuple2<Long, Long>> secondResult = doDeltaIteration(firstResult, edges);
-
- secondResult.print();
-
- Plan p = env.createProgramPlan();
- OptimizedPlan op = compileNoStats(p);
-
- assertEquals(1, op.getDataSinks().size());
- assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
-
- WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
-
- assertEquals(ShipStrategyType.FORWARD, wipn.getInput1().getShipStrategy());
- assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
-
- new JobGraphGenerator().compileJobGraph(op);
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testIterationPushingWorkOut() throws Exception {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
-
- DataSet<Tuple2<Long, Long>> input1 = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
-
- DataSet<Tuple2<Long, Long>> input2 = env.readCsvFile("/some/file/path").types(Long.class, Long.class);
-
- doBulkIteration(input1, input2).print();
-
- Plan p = env.createProgramPlan();
- OptimizedPlan op = compileNoStats(p);
-
- assertEquals(1, op.getDataSinks().size());
- assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof BulkIterationPlanNode);
-
- BulkIterationPlanNode bipn = (BulkIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
-
- // check that work has not! been pushed out, as the end of the step function does not produce the necessary properties
- for (Channel c : bipn.getPartialSolutionPlanNode().getOutgoingChannels()) {
- assertEquals(ShipStrategyType.PARTITION_HASH, c.getShipStrategy());
- }
-
- new JobGraphGenerator().compileJobGraph(op);
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testWorksetIterationPipelineBreakerPlacement() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
-
- // the workset (input two of the delta iteration) is the same as what is consumed be the successive join
- DataSet<Tuple2<Long, Long>> initialWorkset = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
-
- DataSet<Tuple2<Long, Long>> initialSolutionSet = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
-
- // trivial iteration, since we are interested in the inputs to the iteration
- DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = initialSolutionSet.iterateDelta(initialWorkset, 100, 0);
-
- DataSet<Tuple2<Long, Long>> next = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>());
-
- DataSet<Tuple2<Long, Long>> result = iteration.closeWith(next, next);
-
- initialWorkset
- .join(result, JoinHint.REPARTITION_HASH_FIRST)
- .where(0).equalTo(0)
- .print();
-
- Plan p = env.createProgramPlan();
- compileNoStats(p);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testResetPartialSolution() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Long> width = env.generateSequence(1, 10);
- DataSet<Long> update = env.generateSequence(1, 10);
- DataSet<Long> lastGradient = env.generateSequence(1, 10);
-
- DataSet<Long> init = width.union(update).union(lastGradient);
-
- IterativeDataSet<Long> iteration = init.iterate(10);
-
- width = iteration.filter(new IdFilter<Long>());
- update = iteration.filter(new IdFilter<Long>());
- lastGradient = iteration.filter(new IdFilter<Long>());
-
- DataSet<Long> gradient = width.map(new IdentityMapper<Long>());
- DataSet<Long> term = gradient.join(lastGradient)
- .where(new IdentityKeyExtractor<Long>())
- .equalTo(new IdentityKeyExtractor<Long>())
- .with(new JoinFunction<Long, Long, Long>() {
- public Long join(Long first, Long second) { return null; }
- });
-
- update = update.map(new RichMapFunction<Long, Long>() {
- public Long map(Long value) { return null; }
- }).withBroadcastSet(term, "some-name");
-
- DataSet<Long> result = iteration.closeWith(width.union(update).union(lastGradient));
-
- result.print();
-
- Plan p = env.createProgramPlan();
- OptimizedPlan op = compileNoStats(p);
-
- new JobGraphGenerator().compileJobGraph(op);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- public static DataSet<Tuple2<Long, Long>> doBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
-
- // open a bulk iteration
- IterativeDataSet<Tuple2<Long, Long>> iteration = vertices.iterate(20);
-
- DataSet<Tuple2<Long, Long>> changes = iteration
- .join(edges).where(0).equalTo(0).with(new Join222())
- .groupBy(0).aggregate(Aggregations.MIN, 1)
- .join(iteration).where(0).equalTo(0)
- .flatMap(new FlatMapJoin());
-
- // close the bulk iteration
- return iteration.closeWith(changes);
- }
-
-
- public static DataSet<Tuple2<Long, Long>> doDeltaIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
-
- DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> depIteration = vertices.iterateDelta(vertices, 100, 0);
-
- DataSet<Tuple1<Long>> candidates = depIteration.getWorkset().join(edges).where(0).equalTo(0)
- .projectSecond(1);
-
- DataSet<Tuple1<Long>> grouped = candidates.groupBy(0).reduceGroup(new Reduce101());
-
- DataSet<Tuple2<Long, Long>> candidatesDependencies =
- grouped.join(edges).where(0).equalTo(1).projectSecond(0, 1);
-
- DataSet<Tuple2<Long, Long>> verticesWithNewComponents =
- candidatesDependencies.join(depIteration.getSolutionSet()).where(0).equalTo(0)
- .with(new Join222())
- .groupBy(0).aggregate(Aggregations.MIN, 1);
-
- DataSet<Tuple2<Long, Long>> updatedComponentId =
- verticesWithNewComponents.join(depIteration.getSolutionSet()).where(0).equalTo(0)
- .flatMap(new FlatMapJoin());
-
- DataSet<Tuple2<Long, Long>> depResult = depIteration.closeWith(updatedComponentId, updatedComponentId);
-
- return depResult;
-
- }
-
- // --------------------------------------------------------------------------------------------
-
- public static final class Join222 extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
- @Override
- public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
- return null;
- }
- }
-
- public static final class FlatMapJoin extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
-
- @Override
- public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value, Collector<Tuple2<Long, Long>> out) {}
- }
-
- public static final class DummyMap extends RichMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
- @Override
- public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
- return value;
- }
- }
-
- @ForwardedFields("0")
- public static final class Reduce101 extends RichGroupReduceFunction<Tuple1<Long>, Tuple1<Long>> {
-
- @Override
- public void reduce(Iterable<Tuple1<Long>> values, Collector<Tuple1<Long>> out) {}
- }
-
- @ForwardedFields("0")
- public static final class DuplicateValue extends RichMapFunction<Tuple1<Long>, Tuple2<Long, Long>> {
-
- @Override
- public Tuple2<Long, Long> map(Tuple1<Long> value) throws Exception {
- return new Tuple2<Long, Long>(value.f0, value.f0);
- }
- }
-
- public static final class DuplicateValueScalar<T> extends RichMapFunction<T, Tuple2<T, T>> {
-
- @Override
- public Tuple2<T, T> map(T value) {
- return new Tuple2<T, T>(value, value);
- }
- }
-
- public static final class IdFilter<T> implements FilterFunction<T> {
- @Override
- public boolean filter(T value) {
- return true;
- }
- }
-}