You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/07/03 14:51:04 UTC

[3/3] git commit: [FLINK-932] Introduce automatic generation of semantic props for projections

[FLINK-932] Introduce automatic generation of semantic props for projections

This closes #29


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/883474dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/883474dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/883474dd

Branch: refs/heads/master
Commit: 883474ddfd749eedceb85bdd2925a3a76de75206
Parents: 8a8060e
Author: sebastian kunert <sk...@gmail.com>
Authored: Thu Jun 19 16:17:59 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jul 3 14:49:39 2014 +0200

----------------------------------------------------------------------
 .../api/java/functions/SemanticPropUtil.java    |  21 +++
 .../api/java/operators/CrossOperator.java       |  34 ++++-
 .../api/java/operators/JoinOperator.java        |  36 ++++-
 .../api/java/operators/ProjectOperator.java     |   5 +-
 .../SemanticPropertiesProjectionTest.java       | 144 +++++++++++++++++++
 5 files changed, 233 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/883474dd/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/SemanticPropUtil.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/SemanticPropUtil.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/SemanticPropUtil.java
index 4d767b0..9f661f0 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/SemanticPropUtil.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/SemanticPropUtil.java
@@ -48,6 +48,27 @@ public class SemanticPropUtil {
 
 	private static final Pattern PATTERN_DIGIT = Pattern.compile("\\d+");
 
+	public static SingleInputSemanticProperties createProjectionPropertiesSingle(int[] fields) {
+		SingleInputSemanticProperties ssp = new SingleInputSemanticProperties();
+		for (int i = 0; i < fields.length; i++) {
+			ssp.addForwardedField(fields[i], i);
+		}
+		return ssp;
+	}
+
+	public static DualInputSemanticProperties createProjectionPropertiesDual(int[] fields, boolean[] isFromFirst) {
+		DualInputSemanticProperties dsp = new DualInputSemanticProperties();
+
+		for (int i = 0; i < fields.length; i++) {
+			if (isFromFirst[i]) {
+				dsp.addForwardedField1(fields[i], i);
+			} else {
+				dsp.addForwardedField2(fields[i], i);
+			}
+		}
+		return dsp;
+	}
+
 	public static SingleInputSemanticProperties getSemanticPropsSingle(Set<Annotation> set, TypeInformation<?> inType, TypeInformation<?> outType) {
 		if (set == null) {
 			return null;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/883474dd/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CrossOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CrossOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CrossOperator.java
index b497407..76e5320 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CrossOperator.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/CrossOperator.java
@@ -18,10 +18,12 @@ import java.util.Arrays;
 
 import eu.stratosphere.api.common.functions.GenericCrosser;
 import eu.stratosphere.api.common.operators.BinaryOperatorInformation;
+import eu.stratosphere.api.common.operators.DualInputSemanticProperties;
 import eu.stratosphere.api.common.operators.Operator;
 import eu.stratosphere.api.common.operators.base.CrossOperatorBase;
 import eu.stratosphere.api.java.DataSet;
 import eu.stratosphere.api.java.functions.CrossFunction;
+import eu.stratosphere.api.java.functions.SemanticPropUtil;
 import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
 import eu.stratosphere.api.java.typeutils.TypeExtractor;
 //CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
@@ -49,9 +51,19 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 		super(input1, input2, returnType);
 
 		this.function = function;
-		extractSemanticAnnotationsFromUdf(function.getClass());
+
+		if (!(function instanceof ProjectCrossFunction)) {
+			extractSemanticAnnotationsFromUdf(function.getClass());
+		} else {
+			generateProjectionProperties(((ProjectCrossFunction) function));
+		}
+	}
+
+	public void generateProjectionProperties(ProjectCrossFunction pcf) {
+		DualInputSemanticProperties props = SemanticPropUtil.createProjectionPropertiesDual(pcf.getFields(), pcf.getIsFromFirst());
+		setSemanticProperties(props);
 	}
-	
+
 	@Override
 	protected eu.stratosphere.api.common.operators.base.CrossOperatorBase<I1, I2, OUT, GenericCrosser<I1,I2,OUT>> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
 		
@@ -187,6 +199,16 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 			super(input1, input2,
 				new ProjectCrossFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()), returnType);
 		}
+
+		@Override
+		public CrossOperator<I1, I2, OUT> withConstantSetFirst(String... constantSetFirst) {
+			throw new RuntimeException("Please do not use ConstantFields on ProjectCross. The Fields are automatically calculated.");
+		}
+
+		@Override
+		public CrossOperator<I1, I2, OUT> withConstantSetSecond(String... constantSetSecond) {
+			throw new RuntimeException("Please do not use ConstantFields on ProjectCross. The Fields are automatically calculated.");
+		}
 	}
 
 	public static final class ProjectCrossFunction<T1, T2, R extends Tuple> extends CrossFunction<T1, T2, R> {
@@ -234,6 +256,14 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 			}
 			return outTuple;
 		}
+
+		protected int[] getFields() {
+			return fields;
+		}
+
+		protected boolean[] getIsFromFirst() {
+			return isFromFirst;
+		}
 	}
 
 	public static final class CrossProjection<I1, I2> {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/883474dd/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/JoinOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/JoinOperator.java
index 9479c28..c96f05c 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/JoinOperator.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/JoinOperator.java
@@ -21,6 +21,7 @@ import eu.stratosphere.api.common.InvalidProgramException;
 import eu.stratosphere.api.common.functions.GenericJoiner;
 import eu.stratosphere.api.common.functions.GenericMap;
 import eu.stratosphere.api.common.operators.BinaryOperatorInformation;
+import eu.stratosphere.api.common.operators.DualInputSemanticProperties;
 import eu.stratosphere.api.common.operators.Operator;
 import eu.stratosphere.api.common.operators.UnaryOperatorInformation;
 import eu.stratosphere.api.common.operators.base.JoinOperatorBase;
@@ -29,6 +30,7 @@ import eu.stratosphere.api.java.DataSet;
 import eu.stratosphere.api.java.DeltaIteration.SolutionSetPlaceHolder;
 import eu.stratosphere.api.java.functions.JoinFunction;
 import eu.stratosphere.api.java.functions.KeySelector;
+import eu.stratosphere.api.java.functions.SemanticPropUtil;
 import eu.stratosphere.api.java.operators.Keys.FieldPositionKeys;
 import eu.stratosphere.api.java.operators.translation.KeyExtractingMapper;
 import eu.stratosphere.api.java.operators.translation.PlanUnwrappingJoinOperator;
@@ -163,9 +165,19 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			}
 			
 			this.function = function;
-			extractSemanticAnnotationsFromUdf(function.getClass());
+
+			if (!(function instanceof ProjectJoinFunction)) {
+				extractSemanticAnnotationsFromUdf(function.getClass());
+			} else {
+				generateProjectionProperties(((ProjectJoinFunction) function));
+			}
 		}
-		
+
+		public void generateProjectionProperties(ProjectJoinFunction pjf) {
+			DualInputSemanticProperties props = SemanticPropUtil.createProjectionPropertiesDual(pjf.getFields(), pjf.getIsFromFirst());
+			setSemanticProperties(props);
+		}
+
 		// TODO
 //		public EquiJoin<I1, I2, OUT> leftOuter() {
 //			this.preserve1 = true;
@@ -512,6 +524,16 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 					new ProjectJoinFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()), 
 					returnType, hint);
 		}
+
+		@Override
+		public JoinOperator<I1, I2, OUT> withConstantSetFirst(String... constantSetFirst) {
+			throw new RuntimeException("Please do not use ConstantFields on ProjectJoins. The Fields are automatically calculated.");
+		}
+
+		@Override
+		public JoinOperator<I1, I2, OUT> withConstantSetSecond(String... constantSetSecond) {
+			throw new RuntimeException("Please do not use ConstantFields on ProjectJoins. The Fields are automatically calculated.");
+		}
 	}
 	
 //	@SuppressWarnings("unused")
@@ -824,7 +846,15 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			this.isFromFirst = isFromFirst;
 			this.outTuple = outTupleInstance;
 		}
-		
+
+		protected int[] getFields() {
+			return fields;
+		}
+
+		protected boolean[] getIsFromFirst() {
+			return isFromFirst;
+		}
+
 		public R join(T1 in1, T2 in2) {
 			for(int i=0; i<fields.length; i++) {
 				if(isFromFirst[i]) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/883474dd/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ProjectOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ProjectOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ProjectOperator.java
index 1907393..35406d6 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ProjectOperator.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ProjectOperator.java
@@ -19,6 +19,7 @@ import java.util.Arrays;
 import eu.stratosphere.api.common.functions.GenericMap;
 import eu.stratosphere.api.common.operators.Operator;
 import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.functions.SemanticPropUtil;
 import eu.stratosphere.api.java.operators.translation.PlanProjectOperator;
 //CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
 import eu.stratosphere.api.java.tuple.*;
@@ -54,11 +55,11 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		ppo.setInput(input);
 		// set dop
 		ppo.setDegreeOfParallelism(this.getParallelism());
-		
+		ppo.setSemanticProperties(SemanticPropUtil.createProjectionPropertiesSingle(fields));
+
 		return ppo;
 	}
 
-	
 	public static class Projection<T> {
 		
 		private final DataSet<T> ds;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/883474dd/stratosphere-java/src/test/java/eu/stratosphere/api/java/functions/SemanticPropertiesProjectionTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/test/java/eu/stratosphere/api/java/functions/SemanticPropertiesProjectionTest.java b/stratosphere-java/src/test/java/eu/stratosphere/api/java/functions/SemanticPropertiesProjectionTest.java
new file mode 100644
index 0000000..4e812da
--- /dev/null
+++ b/stratosphere-java/src/test/java/eu/stratosphere/api/java/functions/SemanticPropertiesProjectionTest.java
@@ -0,0 +1,144 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.java.functions;
+
+import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.common.operators.DualInputSemanticProperties;
+import eu.stratosphere.api.common.operators.SingleInputSemanticProperties;
+import eu.stratosphere.api.common.operators.base.CrossOperatorBase;
+import eu.stratosphere.api.common.operators.base.GenericDataSinkBase;
+import eu.stratosphere.api.common.operators.base.JoinOperatorBase;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.operators.translation.PlanProjectOperator;
+import eu.stratosphere.api.java.tuple.Tuple4;
+import eu.stratosphere.api.java.tuple.Tuple5;
+import eu.stratosphere.api.java.typeutils.BasicTypeInfo;
+import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Created by sebastian on 6/19/14.
+ */
+public class SemanticPropertiesProjectionTest {
+
+	final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData = new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+
+	final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
+			TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.LONG_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO,
+			BasicTypeInfo.LONG_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO
+	);
+
+
+	@Test
+	public void ProjectOperatorTest() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+			tupleDs.project(1, 3, 2).types(Long.class, Long.class, String.class).print();
+
+			Plan plan = env.createProgramPlan();
+
+			GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
+			PlanProjectOperator<?, ?> projectOperator = ((PlanProjectOperator) sink.getInput());
+
+			SingleInputSemanticProperties props = projectOperator.getSemanticProperties();
+
+			assertTrue(props.getForwardedField(1).size() == 1);
+			assertTrue(props.getForwardedField(3).size() == 1);
+			assertTrue(props.getForwardedField(2).size() == 1);
+			assertTrue(props.getForwardedField(1).contains(0));
+			assertTrue(props.getForwardedField(3).contains(1));
+			assertTrue(props.getForwardedField(2).contains(2));
+		} catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Exception in test: " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void JoinProjectionTest() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+			tupleDs.join(tupleDs).where(0).equalTo(0).projectFirst(2, 3).projectSecond(1, 4).types(String.class, Long.class, Long.class, Integer.class).print();
+
+			Plan plan = env.createProgramPlan();
+
+			GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
+			JoinOperatorBase<?, ?, ?, ?> projectJoinOperator = ((JoinOperatorBase<?, ?, ?, ?>) sink.getInput());
+
+			DualInputSemanticProperties props = projectJoinOperator.getSemanticProperties();
+
+			assertTrue(props.getForwardedField1(2).size() == 1);
+			assertTrue(props.getForwardedField1(3).size() == 1);
+			assertTrue(props.getForwardedField2(1).size() == 1);
+			assertTrue(props.getForwardedField2(4).size() == 1);
+			assertTrue(props.getForwardedField1(2).contains(0));
+			assertTrue(props.getForwardedField1(3).contains(1));
+			assertTrue(props.getForwardedField2(1).contains(2));
+			assertTrue(props.getForwardedField2(4).contains(3));
+		} catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Exception in test: " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void CrossProjectionTest() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+			DataSet<Tuple4<String, Long, Long, Integer>> result = tupleDs.cross(tupleDs).projectFirst(2, 3).projectSecond(1, 4).types(String.class, Long.class, Long.class, Integer.class);
+			result.print();
+
+			Plan plan = env.createProgramPlan();
+
+			GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
+			CrossOperatorBase<?, ?, ?, ?> projectCrossOperator = ((CrossOperatorBase<?, ?, ?, ?>) sink.getInput());
+
+			DualInputSemanticProperties props = projectCrossOperator.getSemanticProperties();
+
+			assertTrue(props.getForwardedField1(2).size() == 1);
+			assertTrue(props.getForwardedField1(3).size() == 1);
+			assertTrue(props.getForwardedField2(1).size() == 1);
+			assertTrue(props.getForwardedField2(4).size() == 1);
+			assertTrue(props.getForwardedField1(2).contains(0));
+			assertTrue(props.getForwardedField1(3).contains(1));
+			assertTrue(props.getForwardedField2(1).contains(2));
+			assertTrue(props.getForwardedField2(4).contains(3));
+		} catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Exception in test: " + e.getMessage());
+		}
+	}
+}