You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/07/03 14:51:04 UTC
[3/3] git commit: [FLINK-932] Introduce automatic generation of
semantic props for projections
[FLINK-932] Introduce automatic generation of semantic props for projections
This closes #29
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/883474dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/883474dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/883474dd
Branch: refs/heads/master
Commit: 883474ddfd749eedceb85bdd2925a3a76de75206
Parents: 8a8060e
Author: sebastian kunert <sk...@gmail.com>
Authored: Thu Jun 19 16:17:59 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jul 3 14:49:39 2014 +0200
----------------------------------------------------------------------
.../api/java/functions/SemanticPropUtil.java | 21 +++
.../api/java/operators/CrossOperator.java | 34 ++++-
.../api/java/operators/JoinOperator.java | 36 ++++-
.../api/java/operators/ProjectOperator.java | 5 +-
.../SemanticPropertiesProjectionTest.java | 144 +++++++++++++++++++
5 files changed, 233 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/883474dd/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/SemanticPropUtil.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/SemanticPropUtil.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/SemanticPropUtil.java
index 4d767b0..9f661f0 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/SemanticPropUtil.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/SemanticPropUtil.java
@@ -48,6 +48,27 @@ public class SemanticPropUtil {
private static final Pattern PATTERN_DIGIT = Pattern.compile("\\d+");
+ public static SingleInputSemanticProperties createProjectionPropertiesSingle(int[] fields) {
+ SingleInputSemanticProperties ssp = new SingleInputSemanticProperties();
+ for (int i = 0; i < fields.length; i++) {
+ ssp.addForwardedField(fields[i], i);
+ }
+ return ssp;
+ }
+
+ public static DualInputSemanticProperties createProjectionPropertiesDual(int[] fields, boolean[] isFromFirst) {
+ DualInputSemanticProperties dsp = new DualInputSemanticProperties();
+
+ for (int i = 0; i < fields.length; i++) {
+ if (isFromFirst[i]) {
+ dsp.addForwardedField1(fields[i], i);
+ } else {
+ dsp.addForwardedField2(fields[i], i);
+ }
+ }
+ return dsp;
+ }
+
public static SingleInputSemanticProperties getSemanticPropsSingle(Set<Annotation> set, TypeInformation<?> inType, TypeInformation<?> outType) {
if (set == null) {
return null;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/883474dd/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CrossOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CrossOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CrossOperator.java
index b497407..76e5320 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CrossOperator.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CrossOperator.java
@@ -18,10 +18,12 @@ import java.util.Arrays;
import eu.stratosphere.api.common.functions.GenericCrosser;
import eu.stratosphere.api.common.operators.BinaryOperatorInformation;
+import eu.stratosphere.api.common.operators.DualInputSemanticProperties;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.common.operators.base.CrossOperatorBase;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.functions.CrossFunction;
+import eu.stratosphere.api.java.functions.SemanticPropUtil;
import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
import eu.stratosphere.api.java.typeutils.TypeExtractor;
//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
@@ -49,9 +51,19 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
super(input1, input2, returnType);
this.function = function;
- extractSemanticAnnotationsFromUdf(function.getClass());
+
+ if (!(function instanceof ProjectCrossFunction)) {
+ extractSemanticAnnotationsFromUdf(function.getClass());
+ } else {
+ generateProjectionProperties(((ProjectCrossFunction) function));
+ }
+ }
+
+ public void generateProjectionProperties(ProjectCrossFunction pcf) {
+ DualInputSemanticProperties props = SemanticPropUtil.createProjectionPropertiesDual(pcf.getFields(), pcf.getIsFromFirst());
+ setSemanticProperties(props);
}
-
+
@Override
protected eu.stratosphere.api.common.operators.base.CrossOperatorBase<I1, I2, OUT, GenericCrosser<I1,I2,OUT>> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
@@ -187,6 +199,16 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
super(input1, input2,
new ProjectCrossFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()), returnType);
}
+
+ @Override
+ public CrossOperator<I1, I2, OUT> withConstantSetFirst(String... constantSetFirst) {
+ throw new RuntimeException("Please do not use ConstantFields on ProjectCross. The Fields are automatically calculated.");
+ }
+
+ @Override
+ public CrossOperator<I1, I2, OUT> withConstantSetSecond(String... constantSetSecond) {
+ throw new RuntimeException("Please do not use ConstantFields on ProjectCross. The Fields are automatically calculated.");
+ }
}
public static final class ProjectCrossFunction<T1, T2, R extends Tuple> extends CrossFunction<T1, T2, R> {
@@ -234,6 +256,14 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
}
return outTuple;
}
+
+ protected int[] getFields() {
+ return fields;
+ }
+
+ protected boolean[] getIsFromFirst() {
+ return isFromFirst;
+ }
}
public static final class CrossProjection<I1, I2> {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/883474dd/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/JoinOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/JoinOperator.java
index 9479c28..c96f05c 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/JoinOperator.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/JoinOperator.java
@@ -21,6 +21,7 @@ import eu.stratosphere.api.common.InvalidProgramException;
import eu.stratosphere.api.common.functions.GenericJoiner;
import eu.stratosphere.api.common.functions.GenericMap;
import eu.stratosphere.api.common.operators.BinaryOperatorInformation;
+import eu.stratosphere.api.common.operators.DualInputSemanticProperties;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.common.operators.UnaryOperatorInformation;
import eu.stratosphere.api.common.operators.base.JoinOperatorBase;
@@ -29,6 +30,7 @@ import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.DeltaIteration.SolutionSetPlaceHolder;
import eu.stratosphere.api.java.functions.JoinFunction;
import eu.stratosphere.api.java.functions.KeySelector;
+import eu.stratosphere.api.java.functions.SemanticPropUtil;
import eu.stratosphere.api.java.operators.Keys.FieldPositionKeys;
import eu.stratosphere.api.java.operators.translation.KeyExtractingMapper;
import eu.stratosphere.api.java.operators.translation.PlanUnwrappingJoinOperator;
@@ -163,9 +165,19 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
}
this.function = function;
- extractSemanticAnnotationsFromUdf(function.getClass());
+
+ if (!(function instanceof ProjectJoinFunction)) {
+ extractSemanticAnnotationsFromUdf(function.getClass());
+ } else {
+ generateProjectionProperties(((ProjectJoinFunction) function));
+ }
}
-
+
+ public void generateProjectionProperties(ProjectJoinFunction pjf) {
+ DualInputSemanticProperties props = SemanticPropUtil.createProjectionPropertiesDual(pjf.getFields(), pjf.getIsFromFirst());
+ setSemanticProperties(props);
+ }
+
// TODO
// public EquiJoin<I1, I2, OUT> leftOuter() {
// this.preserve1 = true;
@@ -512,6 +524,16 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
new ProjectJoinFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()),
returnType, hint);
}
+
+ @Override
+ public JoinOperator<I1, I2, OUT> withConstantSetFirst(String... constantSetFirst) {
+ throw new RuntimeException("Please do not use ConstantFields on ProjectJoins. The Fields are automatically calculated.");
+ }
+
+ @Override
+ public JoinOperator<I1, I2, OUT> withConstantSetSecond(String... constantSetSecond) {
+ throw new RuntimeException("Please do not use ConstantFields on ProjectJoins. The Fields are automatically calculated.");
+ }
}
// @SuppressWarnings("unused")
@@ -824,7 +846,15 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
this.isFromFirst = isFromFirst;
this.outTuple = outTupleInstance;
}
-
+
+ protected int[] getFields() {
+ return fields;
+ }
+
+ protected boolean[] getIsFromFirst() {
+ return isFromFirst;
+ }
+
public R join(T1 in1, T2 in2) {
for(int i=0; i<fields.length; i++) {
if(isFromFirst[i]) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/883474dd/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ProjectOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ProjectOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ProjectOperator.java
index 1907393..35406d6 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ProjectOperator.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ProjectOperator.java
@@ -19,6 +19,7 @@ import java.util.Arrays;
import eu.stratosphere.api.common.functions.GenericMap;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.functions.SemanticPropUtil;
import eu.stratosphere.api.java.operators.translation.PlanProjectOperator;
//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
import eu.stratosphere.api.java.tuple.*;
@@ -54,11 +55,11 @@ public class ProjectOperator<IN, OUT extends Tuple>
ppo.setInput(input);
// set dop
ppo.setDegreeOfParallelism(this.getParallelism());
-
+ ppo.setSemanticProperties(SemanticPropUtil.createProjectionPropertiesSingle(fields));
+
return ppo;
}
-
public static class Projection<T> {
private final DataSet<T> ds;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/883474dd/stratosphere-java/src/test/java/eu/stratosphere/api/java/functions/SemanticPropertiesProjectionTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/test/java/eu/stratosphere/api/java/functions/SemanticPropertiesProjectionTest.java b/stratosphere-java/src/test/java/eu/stratosphere/api/java/functions/SemanticPropertiesProjectionTest.java
new file mode 100644
index 0000000..4e812da
--- /dev/null
+++ b/stratosphere-java/src/test/java/eu/stratosphere/api/java/functions/SemanticPropertiesProjectionTest.java
@@ -0,0 +1,144 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.java.functions;
+
+import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.common.operators.DualInputSemanticProperties;
+import eu.stratosphere.api.common.operators.SingleInputSemanticProperties;
+import eu.stratosphere.api.common.operators.base.CrossOperatorBase;
+import eu.stratosphere.api.common.operators.base.GenericDataSinkBase;
+import eu.stratosphere.api.common.operators.base.JoinOperatorBase;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.operators.translation.PlanProjectOperator;
+import eu.stratosphere.api.java.tuple.Tuple4;
+import eu.stratosphere.api.java.tuple.Tuple5;
+import eu.stratosphere.api.java.typeutils.BasicTypeInfo;
+import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Created by sebastian on 6/19/14.
+ */
+public class SemanticPropertiesProjectionTest {
+
+ final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData = new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+
+ final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
+ TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO
+ );
+
+
+ @Test
+ public void ProjectOperatorTest() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ tupleDs.project(1, 3, 2).types(Long.class, Long.class, String.class).print();
+
+ Plan plan = env.createProgramPlan();
+
+ GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
+ PlanProjectOperator<?, ?> projectOperator = ((PlanProjectOperator) sink.getInput());
+
+ SingleInputSemanticProperties props = projectOperator.getSemanticProperties();
+
+ assertTrue(props.getForwardedField(1).size() == 1);
+ assertTrue(props.getForwardedField(3).size() == 1);
+ assertTrue(props.getForwardedField(2).size() == 1);
+ assertTrue(props.getForwardedField(1).contains(0));
+ assertTrue(props.getForwardedField(3).contains(1));
+ assertTrue(props.getForwardedField(2).contains(2));
+ } catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Exception in test: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void JoinProjectionTest() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ tupleDs.join(tupleDs).where(0).equalTo(0).projectFirst(2, 3).projectSecond(1, 4).types(String.class, Long.class, Long.class, Integer.class).print();
+
+ Plan plan = env.createProgramPlan();
+
+ GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
+ JoinOperatorBase<?, ?, ?, ?> projectJoinOperator = ((JoinOperatorBase<?, ?, ?, ?>) sink.getInput());
+
+ DualInputSemanticProperties props = projectJoinOperator.getSemanticProperties();
+
+ assertTrue(props.getForwardedField1(2).size() == 1);
+ assertTrue(props.getForwardedField1(3).size() == 1);
+ assertTrue(props.getForwardedField2(1).size() == 1);
+ assertTrue(props.getForwardedField2(4).size() == 1);
+ assertTrue(props.getForwardedField1(2).contains(0));
+ assertTrue(props.getForwardedField1(3).contains(1));
+ assertTrue(props.getForwardedField2(1).contains(2));
+ assertTrue(props.getForwardedField2(4).contains(3));
+ } catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Exception in test: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void CrossProjectionTest() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ DataSet<Tuple4<String, Long, Long, Integer>> result = tupleDs.cross(tupleDs).projectFirst(2, 3).projectSecond(1, 4).types(String.class, Long.class, Long.class, Integer.class);
+ result.print();
+
+ Plan plan = env.createProgramPlan();
+
+ GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
+ CrossOperatorBase<?, ?, ?, ?> projectCrossOperator = ((CrossOperatorBase<?, ?, ?, ?>) sink.getInput());
+
+ DualInputSemanticProperties props = projectCrossOperator.getSemanticProperties();
+
+ assertTrue(props.getForwardedField1(2).size() == 1);
+ assertTrue(props.getForwardedField1(3).size() == 1);
+ assertTrue(props.getForwardedField2(1).size() == 1);
+ assertTrue(props.getForwardedField2(4).size() == 1);
+ assertTrue(props.getForwardedField1(2).contains(0));
+ assertTrue(props.getForwardedField1(3).contains(1));
+ assertTrue(props.getForwardedField2(1).contains(2));
+ assertTrue(props.getForwardedField2(4).contains(3));
+ } catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Exception in test: " + e.getMessage());
+ }
+ }
+}