You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/21 16:58:03 UTC
[1/2] incubator-flink git commit: [APIs] Enhance test coverage for
CollectionInputFormat and add tests for failed serializations of user code
objects
Repository: incubator-flink
Updated Branches:
refs/heads/master 02e2857a9 -> cf54a1c2a
[APIs] Enhance test coverage for CollectionInputFormat and add tests for failed serializations of user code objects
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d7853fd3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d7853fd3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d7853fd3
Branch: refs/heads/master
Commit: d7853fd3157310cae2e66dfc0e9ff146905bdb33
Parents: 02e2857
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 20 15:25:21 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Nov 21 16:56:26 2014 +0100
----------------------------------------------------------------------
.../flink/util/InstantiationUtilTest.java | 86 ++++++++
.../api/java/io/CollectionInputFormat.java | 7 +-
.../api/java/io/CollectionInputFormatTest.java | 202 ++++++++++++++++---
.../flink/api/java/io/CsvInputFormatTest.java | 7 +-
4 files changed, 267 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d7853fd3/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java b/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java
index bf4fc8c..aded919 100644
--- a/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java
@@ -19,17 +19,21 @@
package org.apache.flink.util;
import org.apache.flink.api.common.typeutils.base.DoubleValueSerializer;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.DoubleValue;
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
import org.junit.Test;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class InstantiationUtilTest {
@@ -79,6 +83,88 @@ public class InstantiationUtilTest {
assertEquals("Serialized record is not equal after serialization.", toSerialize, deserialized);
}
+
+ @Test
+ public void testWriteToConfigFailingSerialization() {
+ try {
+ final String key1 = "testkey1";
+ final String key2 = "testkey2";
+ final Configuration config = new Configuration();
+
+ try {
+ InstantiationUtil.writeObjectToConfig(new TestClassWriteFails(), config, "irgnored");
+ fail("should throw an exception");
+ }
+ catch (TestException e) {
+ // expected
+ }
+ catch (Exception e) {
+ fail("Wrong exception type - exception not properly forwarded");
+ }
+
+ InstantiationUtil.writeObjectToConfig(new TestClassReadFails(), config, key1);
+ InstantiationUtil.writeObjectToConfig(new TestClassReadFailsCNF(), config, key2);
+
+ try {
+ InstantiationUtil.readObjectFromConfig(config, key1, getClass().getClassLoader());
+ fail("should throw an exception");
+ }
+ catch (TestException e) {
+ // expected
+ }
+ catch (Exception e) {
+ fail("Wrong exception type - exception not properly forwarded");
+ }
+
+ try {
+ InstantiationUtil.readObjectFromConfig(config, key2, getClass().getClassLoader());
+ fail("should throw an exception");
+ }
+ catch (ClassNotFoundException e) {
+ // expected
+ }
+ catch (Exception e) {
+ fail("Wrong exception type - exception not properly forwarded");
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+ // --------------------------------------------------------------------------------------------
+
private class TestClass {}
+
+ private static class TestException extends IOException{
+ private static final long serialVersionUID = 1L;
+ }
+
+ private static class TestClassWriteFails implements java.io.Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ throw new TestException();
+ }
+ }
+
+ private static class TestClassReadFails implements java.io.Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ throw new TestException();
+ }
+ }
+
+ private static class TestClassReadFailsCNF implements java.io.Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ throw new ClassNotFoundException("test exception");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d7853fd3/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
index 89adf96..b999ede 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
@@ -80,8 +80,9 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
out.defaultWriteObject();
out.writeInt(dataSet.size());
+ OutputViewObjectOutputStreamWrapper wrapper = new OutputViewObjectOutputStreamWrapper(out);
for (T element : dataSet){
- serializer.serialize(element, new OutputViewObjectOutputStreamWrapper(out));
+ serializer.serialize(element, wrapper);
}
}
@@ -91,10 +92,10 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
int collectionLength = in.readInt();
List<T> list = new ArrayList<T>(collectionLength);
-
+ InputViewObjectInputStreamWrapper wrapper = new InputViewObjectInputStreamWrapper(in);
for (int i = 0; i < collectionLength; i++){
T element = serializer.createInstance();
- element = serializer.deserialize(element, new InputViewObjectInputStreamWrapper(in));
+ element = serializer.deserialize(element, wrapper);
list.add(element);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d7853fd3/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
index 11a018c..948d22f 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.io;
import static org.junit.Assert.assertEquals;
@@ -26,8 +25,11 @@ import static org.junit.Assert.fail;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
import org.junit.Test;
import java.io.ByteArrayInputStream;
@@ -38,11 +40,13 @@ import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
public class CollectionInputFormatTest {
- public static class ElementType{
- private int id;
+
+ public static class ElementType {
+ private final int id;
public ElementType(){
this(-1);
@@ -52,37 +56,43 @@ public class CollectionInputFormatTest {
this.id = id;
}
- public int getId(){return id;}
+ public int getId() {
+ return id;
+ }
@Override
- public boolean equals(Object obj){
- if(obj != null && obj instanceof ElementType){
+ public boolean equals(Object obj) {
+ if (obj != null && obj instanceof ElementType) {
ElementType et = (ElementType) obj;
-
return et.getId() == this.getId();
- }else {
+ } else {
return false;
}
}
+
+ @Override
+ public int hashCode() {
+ return id;
+ }
}
@Test
- public void testSerializability(){
- Collection<ElementType> inputCollection = new ArrayList<ElementType>();
- ElementType element1 = new ElementType(1);
- ElementType element2 = new ElementType(2);
- ElementType element3 = new ElementType(3);
- inputCollection.add(element1);
- inputCollection.add(element2);
- inputCollection.add(element3);
-
- @SuppressWarnings("unchecked")
- TypeInformation<ElementType> info = (TypeInformation<ElementType>) TypeExtractor.createTypeInfo(ElementType.class);
-
- CollectionInputFormat<ElementType> inputFormat = new CollectionInputFormat<ElementType>(inputCollection,
- info.createSerializer());
-
- try{
+ public void testSerializability() {
+ try {
+ Collection<ElementType> inputCollection = new ArrayList<ElementType>();
+ ElementType element1 = new ElementType(1);
+ ElementType element2 = new ElementType(2);
+ ElementType element3 = new ElementType(3);
+ inputCollection.add(element1);
+ inputCollection.add(element2);
+ inputCollection.add(element3);
+
+ @SuppressWarnings("unchecked")
+ TypeInformation<ElementType> info = (TypeInformation<ElementType>) TypeExtractor.createTypeInfo(ElementType.class);
+
+ CollectionInputFormat<ElementType> inputFormat = new CollectionInputFormat<ElementType>(inputCollection,
+ info.createSerializer());
+
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(buffer);
@@ -108,10 +118,10 @@ public class CollectionInputFormatTest {
assertEquals(expectedElement, actualElement);
}
- }catch(IOException ex){
- fail(ex.toString());
- }catch(ClassNotFoundException ex){
- fail(ex.toString());
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ fail(e.toString());
}
}
@@ -157,7 +167,6 @@ public class CollectionInputFormatTest {
};
try {
-
List<String> inputCollection = Arrays.asList(data);
CollectionInputFormat<String> inputFormat = new CollectionInputFormat<String>(inputCollection, BasicTypeInfo.STRING_TYPE_INFO.createSerializer());
@@ -190,4 +199,139 @@ public class CollectionInputFormatTest {
fail(e.getMessage());
}
}
+
+ @Test
+ public void testSerializationFailure() {
+ try {
+ // a mock serializer that fails when writing
+ CollectionInputFormat<ElementType> inFormat = new CollectionInputFormat<ElementType>(
+ Collections.singleton(new ElementType()), new TestSerializer(false, true));
+
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(buffer);
+
+ try {
+ out.writeObject(inFormat);
+ fail("should throw an exception");
+ }
+ catch (TestException e) {
+ // expected
+ }
+ catch (Exception e) {
+ fail("Exception not properly forwarded");
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDeserializationFailure() {
+ try {
+ // a mock serializer that fails when writing
+ CollectionInputFormat<ElementType> inFormat = new CollectionInputFormat<ElementType>(
+ Collections.singleton(new ElementType()), new TestSerializer(true, false));
+
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(buffer);
+ out.writeObject(inFormat);
+ out.close();
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer.toByteArray());
+ ObjectInputStream in = new ObjectInputStream(bais);
+
+ try {
+ in.readObject();
+ fail("should throw an exception");
+ }
+ catch (TestException e) {
+ // expected
+ }
+ catch (Exception e) {
+ fail("Exception not properly forwarded");
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static class TestException extends IOException{
+ private static final long serialVersionUID = 1L;
+ }
+
+ private static class TestSerializer extends TypeSerializer<ElementType> {
+
+ private static final long serialVersionUID = 1L;
+
+ private boolean failOnRead;
+ private boolean failOnWrite;
+
+ public TestSerializer(boolean failOnRead, boolean failOnWrite) {
+ this.failOnRead = failOnRead;
+ this.failOnWrite = failOnWrite;
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public boolean isStateful() {
+ return false;
+ }
+
+ @Override
+ public ElementType createInstance() {
+ return new ElementType();
+ }
+
+ @Override
+ public ElementType copy(ElementType from) {
+ return from;
+ }
+
+ @Override
+ public ElementType copy(ElementType from, ElementType reuse) {
+ return from;
+ }
+
+ @Override
+ public int getLength() {
+ return 4;
+ }
+
+ @Override
+ public void serialize(ElementType record, DataOutputView target) throws IOException {
+ if (failOnWrite) {
+ throw new TestException();
+ }
+ target.writeInt(record.getId());
+ }
+
+ @Override
+ public ElementType deserialize(DataInputView source) throws IOException {
+ if (failOnRead) {
+ throw new TestException();
+ }
+ return new ElementType(source.readInt());
+ }
+
+ @Override
+ public ElementType deserialize(ElementType reuse, DataInputView source) throws IOException {
+ if (failOnRead) {
+ throw new TestException();
+ }
+ return new ElementType(source.readInt());
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ target.writeInt(source.readInt());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d7853fd3/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index 0662aa6..5f10a2b 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -20,6 +20,7 @@
package org.apache.flink.api.java.io;
import com.google.common.base.Charsets;
+
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
@@ -399,7 +400,8 @@ public class CsvInputFormatTest {
Tuple5<Integer, String, String, String, Double> result = new Tuple5<Integer, String, String, String, Double>();
- Tuple5[] expectedLines = new Tuple5[]{
+ @SuppressWarnings("unchecked")
+ Tuple5<Integer, String, String, String, Double>[] expectedLines = new Tuple5[] {
new Tuple5<Integer, String, String, String, Double>(1997, "Ford", "E350", "ac, abs, moon", 3000.0),
new Tuple5<Integer, String, String, String, Double>(1999, "Chevy", "Venture \"Extended Edition\"", "", 4900.0),
new Tuple5<Integer, String, String, String, Double>(1996, "Jeep", "Grand Cherokee", "MUST SELL! air, moon roof, loaded", 4799.00),
@@ -408,8 +410,7 @@ public class CsvInputFormatTest {
};
try {
-
- for (Tuple5 expected : expectedLines) {
+ for (Tuple5<Integer, String, String, String, Double> expected : expectedLines) {
result = format.nextRecord(result);
assertEquals(expected, result);
}
[2/2] incubator-flink git commit: [FLINK-1263] [optimizer] Implement
compatibility checks for binary operators and custom partitioning
Posted by se...@apache.org.
[FLINK-1263] [optimizer] Implement compatibility checks for binary operators and custom partitioning
This closes #223
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/cf54a1c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/cf54a1c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/cf54a1c2
Branch: refs/heads/master
Commit: cf54a1c2af1e228f83609903ba288476c5f05fee
Parents: d7853fd
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 20 16:09:40 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Nov 21 16:56:35 2014 +0100
----------------------------------------------------------------------
.../apache/flink/compiler/dag/TwoInputNode.java | 23 ++-
.../compiler/dag/WorksetIterationNode.java | 7 +-
.../operators/AbstractJoinDescriptor.java | 19 ++-
.../operators/BinaryUnionOpDescriptor.java | 7 +
.../operators/CartesianProductDescriptor.java | 6 +
.../compiler/operators/CoGroupDescriptor.java | 14 +-
.../CrossBlockOuterFirstDescriptor.java | 5 +-
.../CrossBlockOuterSecondDescriptor.java | 5 +-
.../CrossStreamOuterFirstDescriptor.java | 5 +-
.../CrossStreamOuterSecondDescriptor.java | 5 +-
.../operators/OperatorDescriptorDual.java | 3 +
.../operators/UtilSinkJoinOpDescriptor.java | 9 +-
...naryCustomPartitioningCompatibilityTest.java | 84 ++++++++++
.../compiler/java/PartitioningOperatorTest.java | 70 ++++++++
.../operators/CoGroupCompatibilityTest.java | 161 +++++++++++++++++++
.../JoinGlobalPropertiesCompatibilityTest.java | 161 +++++++++++++++++++
.../JoinOnConflictingPartitioningsTest.java | 65 ++++++++
.../common/operators/SingleInputOperator.java | 12 +-
.../SingleInputSemanticProperties.java | 59 ++++++-
.../operators/base/PartitionOperatorBase.java | 7 +-
.../PartitionOperatorTranslationTest.scala | 60 +++++++
21 files changed, 743 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
index b329a6e..32f0519 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
@@ -418,18 +418,25 @@ public abstract class TwoInputNode extends OptimizerNode {
* the pairs of global properties.
* *******************************************************************/
+ outer:
for (GlobalPropertiesPair gpp : allGlobalPairs) {
if (gpp.getProperties1().isMetBy(c1.getGlobalProperties()) &&
gpp.getProperties2().isMetBy(c2.getGlobalProperties()) )
{
- Channel c1Clone = c1.clone();
- c1Clone.setRequiredGlobalProps(gpp.getProperties1());
- c2.setRequiredGlobalProps(gpp.getProperties2());
-
- // we form a valid combination, so create the local candidates
- // for this
- addLocalCandidates(c1Clone, c2, broadcastPlanChannels, igps1, igps2, outputPlans, allLocalPairs, estimator);
- break;
+ for (OperatorDescriptorDual desc : getProperties()) {
+ if (desc.areCompatible(gpp.getProperties1(), gpp.getProperties2(),
+ c1.getGlobalProperties(), c2.getGlobalProperties()))
+ {
+ Channel c1Clone = c1.clone();
+ c1Clone.setRequiredGlobalProps(gpp.getProperties1());
+ c2.setRequiredGlobalProps(gpp.getProperties2());
+
+ // we form a valid combination, so create the local candidates
+ // for this
+ addLocalCandidates(c1Clone, c2, broadcastPlanChannels, igps1, igps2, outputPlans, allLocalPairs, estimator);
+ break outer;
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
index 0dd23bf..ae6bc6b 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/WorksetIterationNode.java
@@ -514,6 +514,12 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
}
@Override
+ public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2,
+ GlobalProperties produced1, GlobalProperties produced2) {
+ return true;
+ }
+
+ @Override
public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
LocalProperties produced1, LocalProperties produced2) {
return true;
@@ -562,6 +568,5 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
// no estimates are needed here
}
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
index 84af77c..cb0e61c 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
@@ -64,14 +64,14 @@ public abstract class AbstractJoinDescriptor extends OperatorDescriptorDual {
// partition both (hash or custom)
RequestedGlobalProperties partitioned1 = new RequestedGlobalProperties();
if (customPartitioner == null) {
- partitioned1.setHashPartitioned(this.keys1);
+ partitioned1.setAnyPartitioning(this.keys1);
} else {
partitioned1.setCustomPartitioned(this.keys1, this.customPartitioner);
}
RequestedGlobalProperties partitioned2 = new RequestedGlobalProperties();
if (customPartitioner == null) {
- partitioned2.setHashPartitioned(this.keys2);
+ partitioned2.setAnyPartitioning(this.keys2);
} else {
partitioned2.setCustomPartitioned(this.keys2, this.customPartitioner);
}
@@ -96,6 +96,21 @@ public abstract class AbstractJoinDescriptor extends OperatorDescriptorDual {
}
return pairs;
}
+
+ @Override
+ public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2,
+ GlobalProperties produced1, GlobalProperties produced2)
+ {
+ if (requested1.getPartitioning().isPartitionedOnKey() && requested2.getPartitioning().isPartitionedOnKey()) {
+ return produced1.getPartitioning() == produced2.getPartitioning() &&
+ (produced1.getCustomPartitioner() == null ?
+ produced2.getCustomPartitioner() == null :
+ produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner()));
+ } else {
+ return true;
+ }
+
+ }
@Override
public GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/operators/BinaryUnionOpDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/BinaryUnionOpDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/BinaryUnionOpDescriptor.java
index a78f8e7..c393fc2 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/BinaryUnionOpDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/BinaryUnionOpDescriptor.java
@@ -27,6 +27,7 @@ import org.apache.flink.compiler.dag.TwoInputNode;
import org.apache.flink.compiler.dataproperties.GlobalProperties;
import org.apache.flink.compiler.dataproperties.LocalProperties;
import org.apache.flink.compiler.dataproperties.PartitioningProperty;
+import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
import org.apache.flink.compiler.dataproperties.RequestedLocalProperties;
import org.apache.flink.compiler.plan.BinaryUnionPlanNode;
import org.apache.flink.compiler.plan.Channel;
@@ -87,4 +88,10 @@ public class BinaryUnionOpDescriptor extends OperatorDescriptorDual {
LocalProperties produced1, LocalProperties produced2) {
return true;
}
+
+ @Override
+ public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2,
+ GlobalProperties produced1, GlobalProperties produced2) {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java
index 5107486..fefd71a 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java
@@ -78,6 +78,12 @@ public abstract class CartesianProductDescriptor extends OperatorDescriptorDual
return Collections.singletonList(new LocalPropertiesPair(
new RequestedLocalProperties(), new RequestedLocalProperties()));
}
+
+ @Override
+ public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2,
+ GlobalProperties produced1, GlobalProperties produced2) {
+ return true;
+ }
@Override
public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
index 55db89d..90e4c3b 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.compiler.operators;
import java.util.Collections;
@@ -37,9 +36,6 @@ import org.apache.flink.compiler.plan.DualInputPlanNode;
import org.apache.flink.compiler.util.Utils;
import org.apache.flink.runtime.operators.DriverStrategy;
-/**
- *
- */
public class CoGroupDescriptor extends OperatorDescriptorDual {
private final Ordering ordering1; // ordering on the first input
@@ -110,6 +106,16 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
}
@Override
+ public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2,
+ GlobalProperties produced1, GlobalProperties produced2)
+ {
+ return produced1.getPartitioning() == produced2.getPartitioning() &&
+ (produced1.getCustomPartitioner() == null ?
+ produced2.getCustomPartitioner() == null :
+ produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner()));
+ }
+
+ @Override
public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
LocalProperties produced1, LocalProperties produced2)
{
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossBlockOuterFirstDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossBlockOuterFirstDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossBlockOuterFirstDescriptor.java
index be14d80..1ec1905 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossBlockOuterFirstDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossBlockOuterFirstDescriptor.java
@@ -16,15 +16,12 @@
* limitations under the License.
*/
-
package org.apache.flink.compiler.operators;
import org.apache.flink.compiler.dataproperties.LocalProperties;
import org.apache.flink.runtime.operators.DriverStrategy;
-/**
- *
- */
+
public class CrossBlockOuterFirstDescriptor extends CartesianProductDescriptor {
public CrossBlockOuterFirstDescriptor() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossBlockOuterSecondDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossBlockOuterSecondDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossBlockOuterSecondDescriptor.java
index 4fcfef1..a10f1a4 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossBlockOuterSecondDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossBlockOuterSecondDescriptor.java
@@ -16,15 +16,12 @@
* limitations under the License.
*/
-
package org.apache.flink.compiler.operators;
import org.apache.flink.compiler.dataproperties.LocalProperties;
import org.apache.flink.runtime.operators.DriverStrategy;
-/**
- *
- */
+
public class CrossBlockOuterSecondDescriptor extends CartesianProductDescriptor {
public CrossBlockOuterSecondDescriptor() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossStreamOuterFirstDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossStreamOuterFirstDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossStreamOuterFirstDescriptor.java
index f80f404..a0512d4 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossStreamOuterFirstDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossStreamOuterFirstDescriptor.java
@@ -16,15 +16,12 @@
* limitations under the License.
*/
-
package org.apache.flink.compiler.operators;
import org.apache.flink.compiler.dataproperties.LocalProperties;
import org.apache.flink.runtime.operators.DriverStrategy;
-/**
- *
- */
+
public class CrossStreamOuterFirstDescriptor extends CartesianProductDescriptor {
public CrossStreamOuterFirstDescriptor() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossStreamOuterSecondDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossStreamOuterSecondDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossStreamOuterSecondDescriptor.java
index 6975760..9b6b122 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossStreamOuterSecondDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CrossStreamOuterSecondDescriptor.java
@@ -16,15 +16,12 @@
* limitations under the License.
*/
-
package org.apache.flink.compiler.operators;
import org.apache.flink.compiler.dataproperties.LocalProperties;
import org.apache.flink.runtime.operators.DriverStrategy;
-/**
- *
- */
+
public class CrossStreamOuterSecondDescriptor extends CartesianProductDescriptor {
public CrossStreamOuterSecondDescriptor() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java
index 11224ec..8eca16e 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java
@@ -70,6 +70,9 @@ public abstract class OperatorDescriptorDual implements AbstractOperatorDescript
protected abstract List<LocalPropertiesPair> createPossibleLocalProperties();
+ public abstract boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2,
+ GlobalProperties produced1, GlobalProperties produced2);
+
public abstract boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
LocalProperties produced1, LocalProperties produced2);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/main/java/org/apache/flink/compiler/operators/UtilSinkJoinOpDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/UtilSinkJoinOpDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/UtilSinkJoinOpDescriptor.java
index 7ccdda0..a17096f 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/UtilSinkJoinOpDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/UtilSinkJoinOpDescriptor.java
@@ -59,9 +59,14 @@ public class UtilSinkJoinOpDescriptor extends OperatorDescriptorDual {
}
@Override
+ public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2,
+ GlobalProperties produced1, GlobalProperties produced2) {
+ return true;
+ }
+
+ @Override
public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
- LocalProperties produced1, LocalProperties produced2)
- {
+ LocalProperties produced1, LocalProperties produced2) {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/BinaryCustomPartitioningCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/BinaryCustomPartitioningCompatibilityTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/BinaryCustomPartitioningCompatibilityTest.java
new file mode 100644
index 0000000..ff3d78e
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/BinaryCustomPartitioningCompatibilityTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.custompartition;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.plan.DualInputPlanNode;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+@SuppressWarnings({"serial","unchecked"})
+public class BinaryCustomPartitioningCompatibilityTest extends CompilerTestBase {
+
+ @Test
+ public void testCompatiblePartitioning() {
+ try {
+ final Partitioner<Long> partitioner = new Partitioner<Long>() {
+ @Override
+ public int partition(Long key, int numPartitions) {
+ return 0;
+ }
+ };
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+ DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+
+ input1.partitionCustom(partitioner, 1)
+ .join(input2.partitionCustom(partitioner, 0))
+ .where(1).equalTo(0)
+ .print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ DualInputPlanNode join = (DualInputPlanNode) sink.getInput().getSource();
+ SingleInputPlanNode partitioner1 = (SingleInputPlanNode) join.getInput1().getSource();
+ SingleInputPlanNode partitioner2 = (SingleInputPlanNode) join.getInput2().getSource();
+
+ assertEquals(ShipStrategyType.FORWARD, join.getInput1().getShipStrategy());
+ assertEquals(ShipStrategyType.FORWARD, join.getInput2().getShipStrategy());
+
+ assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner1.getInput().getShipStrategy());
+ assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner2.getInput().getShipStrategy());
+ assertEquals(partitioner, partitioner1.getInput().getPartitioner());
+ assertEquals(partitioner, partitioner2.getInput().getPartitioner());
+
+ new NepheleJobGraphGenerator().compileJobGraph(op);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/test/java/org/apache/flink/compiler/java/PartitioningOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/java/PartitioningOperatorTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/java/PartitioningOperatorTest.java
new file mode 100644
index 0000000..26185c7
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/java/PartitioningOperatorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.java;
+
+import static org.junit.Assert.*;
+
+import java.util.Collections;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.testfunctions.IdentityGroupReducer;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class PartitioningOperatorTest extends CompilerTestBase {
+
+ @Test
+ public void testPartitiongOperatorPreservesFields() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> data = env.fromCollection(Collections.singleton(new Tuple2<Long, Long>(0L, 0L)));
+
+ data.partitionCustom(new Partitioner<Long>() {
+ public int partition(Long key, int numPartitions) { return key.intValue(); }
+ }, 1)
+ .groupBy(1)
+ .reduceGroup(new IdentityGroupReducer<Tuple2<Long,Long>>())
+ .print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
+ SingleInputPlanNode partitioner = (SingleInputPlanNode) reducer.getInput().getSource();
+
+ assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy());
+ assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupCompatibilityTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupCompatibilityTest.java
new file mode 100644
index 0000000..ae65094
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/operators/CoGroupCompatibilityTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.operators;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.compiler.dataproperties.GlobalProperties;
+import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class CoGroupCompatibilityTest {
+
+ @Test
+ public void checkCompatiblePartitionings() {
+ try {
+ final FieldList keysLeft = new FieldList(1, 4);
+ final FieldList keysRight = new FieldList(3, 1);
+
+ CoGroupDescriptor descr = new CoGroupDescriptor(keysLeft, keysRight);
+
+ // test compatible hash partitioning
+ {
+ RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+ reqLeft.setHashPartitioned(keysLeft);
+ RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+ reqRight.setHashPartitioned(keysRight);
+
+ GlobalProperties propsLeft = new GlobalProperties();
+ propsLeft.setHashPartitioned(keysLeft);
+ GlobalProperties propsRight = new GlobalProperties();
+ propsRight.setHashPartitioned(keysRight);
+
+ assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+ }
+
+ // test compatible custom partitioning
+ {
+ Partitioner<Object> part = new Partitioner<Object>() {
+ @Override
+ public int partition(Object key, int numPartitions) {
+ return 0;
+ }
+ };
+
+ RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+ reqLeft.setCustomPartitioned(keysLeft, part);
+ RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+ reqRight.setCustomPartitioned(keysRight, part);
+
+ GlobalProperties propsLeft = new GlobalProperties();
+ propsLeft.setCustomPartitioned(keysLeft, part);
+ GlobalProperties propsRight = new GlobalProperties();
+ propsRight.setCustomPartitioned(keysRight, part);
+
+ assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+ }
+
+ // test custom partitioning matching any partitioning
+ {
+ Partitioner<Object> part = new Partitioner<Object>() {
+ @Override
+ public int partition(Object key, int numPartitions) {
+ return 0;
+ }
+ };
+
+ RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+ reqLeft.setAnyPartitioning(keysLeft);
+ RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+ reqRight.setAnyPartitioning(keysRight);
+
+ GlobalProperties propsLeft = new GlobalProperties();
+ propsLeft.setCustomPartitioned(keysLeft, part);
+ GlobalProperties propsRight = new GlobalProperties();
+ propsRight.setCustomPartitioned(keysRight, part);
+
+ assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void checkInompatiblePartitionings() {
+ try {
+ final FieldList keysLeft = new FieldList(1);
+ final FieldList keysRight = new FieldList(3);
+
+ final Partitioner<Object> part = new Partitioner<Object>() {
+ @Override
+ public int partition(Object key, int numPartitions) {
+ return 0;
+ }
+ };
+ final Partitioner<Object> part2 = new Partitioner<Object>() {
+ @Override
+ public int partition(Object key, int numPartitions) {
+ return 0;
+ }
+ };
+
+ CoGroupDescriptor descr = new CoGroupDescriptor(keysLeft, keysRight);
+
+ // test incompatible hash with custom partitioning
+ {
+ RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+ reqLeft.setAnyPartitioning(keysLeft);
+ RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+ reqRight.setAnyPartitioning(keysRight);
+
+ GlobalProperties propsLeft = new GlobalProperties();
+ propsLeft.setHashPartitioned(keysLeft);
+ GlobalProperties propsRight = new GlobalProperties();
+ propsRight.setCustomPartitioned(keysRight, part);
+
+ assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+ }
+
+ // test incompatible custom partitionings
+ {
+ RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+ reqLeft.setAnyPartitioning(keysLeft);
+ RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+ reqRight.setAnyPartitioning(keysRight);
+
+ GlobalProperties propsLeft = new GlobalProperties();
+ propsLeft.setCustomPartitioned(keysLeft, part);
+ GlobalProperties propsRight = new GlobalProperties();
+ propsRight.setCustomPartitioned(keysRight, part2);
+
+ assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/test/java/org/apache/flink/compiler/operators/JoinGlobalPropertiesCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/operators/JoinGlobalPropertiesCompatibilityTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/operators/JoinGlobalPropertiesCompatibilityTest.java
new file mode 100644
index 0000000..13f4102
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/operators/JoinGlobalPropertiesCompatibilityTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.operators;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.compiler.dataproperties.GlobalProperties;
+import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class JoinGlobalPropertiesCompatibilityTest {
+
+ @Test
+ public void checkCompatiblePartitionings() {
+ try {
+ final FieldList keysLeft = new FieldList(1, 4);
+ final FieldList keysRight = new FieldList(3, 1);
+
+ SortMergeJoinDescriptor descr = new SortMergeJoinDescriptor(keysLeft, keysRight);
+
+ // test compatible hash partitioning
+ {
+ RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+ reqLeft.setHashPartitioned(keysLeft);
+ RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+ reqRight.setHashPartitioned(keysRight);
+
+ GlobalProperties propsLeft = new GlobalProperties();
+ propsLeft.setHashPartitioned(keysLeft);
+ GlobalProperties propsRight = new GlobalProperties();
+ propsRight.setHashPartitioned(keysRight);
+
+ assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+ }
+
+ // test compatible custom partitioning
+ {
+ Partitioner<Object> part = new Partitioner<Object>() {
+ @Override
+ public int partition(Object key, int numPartitions) {
+ return 0;
+ }
+ };
+
+ RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+ reqLeft.setCustomPartitioned(keysLeft, part);
+ RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+ reqRight.setCustomPartitioned(keysRight, part);
+
+ GlobalProperties propsLeft = new GlobalProperties();
+ propsLeft.setCustomPartitioned(keysLeft, part);
+ GlobalProperties propsRight = new GlobalProperties();
+ propsRight.setCustomPartitioned(keysRight, part);
+
+ assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+ }
+
+ // test custom partitioning matching any partitioning
+ {
+ Partitioner<Object> part = new Partitioner<Object>() {
+ @Override
+ public int partition(Object key, int numPartitions) {
+ return 0;
+ }
+ };
+
+ RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+ reqLeft.setAnyPartitioning(keysLeft);
+ RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+ reqRight.setAnyPartitioning(keysRight);
+
+ GlobalProperties propsLeft = new GlobalProperties();
+ propsLeft.setCustomPartitioned(keysLeft, part);
+ GlobalProperties propsRight = new GlobalProperties();
+ propsRight.setCustomPartitioned(keysRight, part);
+
+ assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void checkInompatiblePartitionings() {
+ try {
+ final FieldList keysLeft = new FieldList(1);
+ final FieldList keysRight = new FieldList(3);
+
+ final Partitioner<Object> part = new Partitioner<Object>() {
+ @Override
+ public int partition(Object key, int numPartitions) {
+ return 0;
+ }
+ };
+ final Partitioner<Object> part2 = new Partitioner<Object>() {
+ @Override
+ public int partition(Object key, int numPartitions) {
+ return 0;
+ }
+ };
+
+ SortMergeJoinDescriptor descr = new SortMergeJoinDescriptor(keysLeft, keysRight);
+
+ // test incompatible hash with custom partitioning
+ {
+ RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+ reqLeft.setAnyPartitioning(keysLeft);
+ RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+ reqRight.setAnyPartitioning(keysRight);
+
+ GlobalProperties propsLeft = new GlobalProperties();
+ propsLeft.setHashPartitioned(keysLeft);
+ GlobalProperties propsRight = new GlobalProperties();
+ propsRight.setCustomPartitioned(keysRight, part);
+
+ assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+ }
+
+ // test incompatible custom partitionings
+ {
+ RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+ reqLeft.setAnyPartitioning(keysLeft);
+ RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+ reqRight.setAnyPartitioning(keysRight);
+
+ GlobalProperties propsLeft = new GlobalProperties();
+ propsLeft.setCustomPartitioned(keysLeft, part);
+ GlobalProperties propsRight = new GlobalProperties();
+ propsRight.setCustomPartitioned(keysRight, part2);
+
+ assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-compiler/src/test/java/org/apache/flink/compiler/operators/JoinOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/operators/JoinOnConflictingPartitioningsTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/operators/JoinOnConflictingPartitioningsTest.java
new file mode 100644
index 0000000..c8713e6
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/operators/JoinOnConflictingPartitioningsTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.operators;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.compiler.CompilerException;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.PactCompiler;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+@SuppressWarnings({"serial", "unchecked"})
+public class JoinOnConflictingPartitioningsTest extends CompilerTestBase {
+
+ @Test
+ public void testRejectJoinOnHashAndRangePartitioning() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+
+ Configuration cfg = new Configuration();
+ cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH);
+ cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE);
+
+ input.join(input).where(0).equalTo(0)
+ .withParameters(cfg)
+ .print();
+
+ Plan p = env.createProgramPlan();
+ try {
+ compileNoStats(p);
+ fail("This should fail with an exception");
+ }
+ catch (CompilerException e) {
+ // expected
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
index f1bf2ad..5b491bf 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
@@ -35,19 +35,13 @@ import org.apache.flink.util.Visitor;
*/
public abstract class SingleInputOperator<IN, OUT, FT extends Function> extends AbstractUdfOperator<OUT, FT> {
- /**
- * The input which produces the data consumed by this operator.
- */
+ /** The input which produces the data consumed by this operator. */
protected Operator<IN> input;
- /**
- * The positions of the keys in the tuple.
- */
+ /** The positions of the keys in the tuple. */
private final int[] keyFields;
- /**
- * Semantic properties of the associated function.
- */
+ /** Semantic properties of the associated function. */
private SingleInputSemanticProperties semanticProperties;
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
index 2de53eb..45f020a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputSemanticProperties.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.operators;
import java.util.HashMap;
@@ -149,5 +148,63 @@ public class SingleInputSemanticProperties extends SemanticProperties {
this.forwardedFields = new HashMap<Integer,FieldSet>();
this.readFields = null;
}
+
+ // --------------------------------------------------------------------------------------------
+
+ public static class AllFieldsConstantProperties extends SingleInputSemanticProperties {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public FieldSet getReadFields() {
+ return FieldSet.EMPTY_SET;
+ }
+
+ @Override
+ public FieldSet getWrittenFields() {
+ return FieldSet.EMPTY_SET;
+ }
+
+ @Override
+ public FieldSet getForwardedField(int sourceField) {
+ return new FieldSet(sourceField);
+ }
+
+ // ----- all mutating operations are unsupported -----
+
+ @Override
+ public void addForwardedField(int sourceField, FieldSet destinationFields) {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public void addForwardedField(int sourceField, int destinationField) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setForwardedField(int sourceField, FieldSet destinationFields) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void addReadFields(FieldSet readFields) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setReadFields(FieldSet readFields) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void addWrittenFields(FieldSet writtenFields) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setWrittenFields(FieldSet writtenFields) {
+ throw new UnsupportedOperationException();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
index ee3b259..6735298 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
@@ -24,11 +24,11 @@ import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.NoOpFunction;
import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
/**
- *
* @param <IN> The input and result type.
*/
public class PartitionOperatorBase<IN> extends SingleInputOperator<IN, IN, NoOpFunction> {
@@ -79,6 +79,11 @@ public class PartitionOperatorBase<IN> extends SingleInputOperator<IN, IN, NoOpF
}
this.customPartitioner = customPartitioner;
}
+
+ @Override
+ public SingleInputSemanticProperties getSemanticProperties() {
+ return new SingleInputSemanticProperties.AllFieldsConstantProperties();
+ }
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cf54a1c2/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
new file mode 100644
index 0000000..1e44413
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.compiler
+
+import org.junit.Test
+import org.junit.Assert._
+import org.apache.flink.api.scala._
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType
+import org.apache.flink.test.compiler.util.CompilerTestBase
+import org.apache.flink.compiler.plan.SingleInputPlanNode
+import org.apache.flink.api.common.functions.Partitioner
+
+class PartitionOperatorTranslationTest extends CompilerTestBase {
+
+ @Test
+ def testPartitiongOperatorPreservesFields() {
+ try {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val data = env.fromElements( (0L, 0L) )
+ data.partitionCustom(new Partitioner[Long]() {
+ def partition(key: Long, numPartitions: Int): Int = key.intValue()
+ }, 1)
+ .groupBy(1).reduceGroup( x => x)
+ .print()
+
+ val p = env.createProgramPlan()
+ val op = compileNoStats(p)
+
+ val sink = op.getDataSinks.iterator().next()
+ val reducer = sink.getInput.getSource.asInstanceOf[SingleInputPlanNode]
+ val partitioner = reducer.getInput.getSource.asInstanceOf[SingleInputPlanNode]
+
+ assertEquals(ShipStrategyType.FORWARD, reducer.getInput.getShipStrategy)
+ assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput.getShipStrategy)
+ }
+ catch {
+ case e: Exception => {
+ e.printStackTrace()
+ fail(e.getMessage)
+ }
+ }
+ }
+}