You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by fhueske <gi...@git.apache.org> on 2015/01/14 23:36:33 UTC

[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

GitHub user fhueske opened a pull request:

    https://github.com/apache/flink/pull/311

    [FLINK-1328] Reworked semantic annotations

    This PR is based on #83
    
    Open Issues:
    - Naming of class annotations (`ConstantFields`, `ConstantFieldsExcept`, `ReadFields`, ...) and API methods (`withConstantSet()`, ...). Constant fields and constant set is a bit inconsistent. Do we want to keep the term *constant* or should we go with preserved or forwarded fields? Right now, the original naming is preserved. 
    - Documentation except for JavaDocs not done yet.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/fhueske/flink constantFields

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/311.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #311
    
----
commit 25af81509c997b1f6748447db97bfe59322dde73
Author: sebastian kunert <sk...@gmail.com>
Date:   2014-08-31T17:22:56Z

    integrated forwarded Fields into the optimizer

commit e6fb7116e3c0cbf2b615f0f3f580aa37707df3d3
Author: Fabian Hueske <fh...@apache.org>
Date:   2014-12-17T17:59:14Z

    [FLINK-1328] Reworked semantic annotations for functions.
    - Constant fields can be defined for (nested) tuples, Pojos, case classes
    - Added semantic function information to example programs

commit a6f50ac7238507c8bb8b8c1739170fa95ca8e7d6
Author: Fabian Hueske <fh...@apache.org>
Date:   2015-01-14T15:48:07Z

    [FLINK-1328] Added previously failing semantic annotations to examples and fixed them by enabling partitioning on AtomicTypes.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/311#issuecomment-71598228
  
    +1 from my side


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/311#issuecomment-70226947
  
    How about names along the lines of "Unmodified Fields" ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/311#issuecomment-71470851
  
    Addressed most comments and renamed constantFields/Sets to forwardedFields as discussed on dev-ml.
    Would like to merge this soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/311#issuecomment-70363765
  
    Another question: Would it make sense to annotate the `DefaultFlatJoinFunction` in `JoinOperator.java`.
    We know for sure that the function is going to do. Right now, the optimizer doesn't know that the partitioning is preserved.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/311#discussion_r23070484
  
    --- Diff: flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java ---
    @@ -266,4 +268,44 @@ public void computeOutputEstimates(DataStatistics statistics) {
     		this.estimatedOutputSize = in1.estimatedOutputSize > 0 && in2.estimatedOutputSize > 0 ?
     			in1.estimatedOutputSize + in2.estimatedOutputSize : -1;
     	}
    +
    +	public static class UnionSemanticProperties implements SemanticProperties {
    +
    +		@Override
    +		public FieldSet getTargetFields(int input, int sourceField) {
    +			if (input != 0 && input != 1) {
    +				throw new IndexOutOfBoundsException();
    --- End diff --
    
    That's an internal exception. If something fails here, there is something wrong with the optimizer. Nothing a user can solve. Could make that clear though...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/311#discussion_r23135998
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java ---
    @@ -56,218 +56,135 @@
     
     	
     	public DualInputSemanticProperties() {
    -		init();
    +		this.fieldMapping1 = new HashMap<Integer,FieldSet>();
    +		this.fieldMapping2 = new HashMap<Integer,FieldSet>();
    +		this.readFields1 = null;
    +		this.readFields2 = null;
     	}
    -	
    -	/**
    -	 * Adds, to the existing information, a field that is forwarded directly
    -	 * from the source record(s) in the first input to the destination
    -	 * record(s).
    -	 * 
    -	 * @param sourceField the position in the source record(s) from the first input
    -	 * @param destinationField the position in the destination record(s)
    -	 */
    -	public void addForwardedField1(int sourceField, int destinationField) {
    -		FieldSet old = this.forwardedFields1.get(sourceField);
    -		if (old == null) {
    -			old = FieldSet.EMPTY_SET;
    +
    +	@Override
    +	public FieldSet getTargetFields(int input, int sourceField) {
    +
    +		if (input != 0 && input != 1) {
    +			throw new IndexOutOfBoundsException();
    +		} else if (input == 0) {
    +
    +			return fieldMapping1.get(sourceField);
    +		} else {
    +			return fieldMapping2.get(sourceField);
     		}
    -		
    -		FieldSet fs = old.addField(destinationField);
    -		this.forwardedFields1.put(sourceField, fs);
     	}
    -	
    -	/**
    -	 * Adds, to the existing information, a field that is forwarded directly
    -	 * from the source record(s) in the first input to multiple fields in
    -	 * the destination record(s).
    -	 * 
    -	 * @param sourceField the position in the source record(s)
    -	 * @param destinationFields the position in the destination record(s)
    -	 */
    -	public void addForwardedField1(int sourceField, FieldSet destinationFields) {
    -		FieldSet old = this.forwardedFields1.get(sourceField);
    -		if (old == null) {
    -			old = FieldSet.EMPTY_SET;
    +
    +	@Override
    +	public int getSourceField(int input, int targetField) {
    +		Map<Integer, FieldSet> fieldMapping;
    +
    +		if (input != 0 && input != 1) {
    +			throw new IndexOutOfBoundsException();
    +		} else if (input == 0) {
    +			fieldMapping = fieldMapping1;
    +		} else {
    +			fieldMapping = fieldMapping2;
    +		}
    +
    +		for (int sourceField : fieldMapping.keySet()) {
    --- End diff --
    
    This can be more efficiently done by iterating over the EntrySet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/311#discussion_r23163896
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputSemanticProperties.java ---
    @@ -56,218 +56,135 @@
     
     	
     	public DualInputSemanticProperties() {
    -		init();
    +		this.fieldMapping1 = new HashMap<Integer,FieldSet>();
    +		this.fieldMapping2 = new HashMap<Integer,FieldSet>();
    +		this.readFields1 = null;
    +		this.readFields2 = null;
     	}
    -	
    -	/**
    -	 * Adds, to the existing information, a field that is forwarded directly
    -	 * from the source record(s) in the first input to the destination
    -	 * record(s).
    -	 * 
    -	 * @param sourceField the position in the source record(s) from the first input
    -	 * @param destinationField the position in the destination record(s)
    -	 */
    -	public void addForwardedField1(int sourceField, int destinationField) {
    -		FieldSet old = this.forwardedFields1.get(sourceField);
    -		if (old == null) {
    -			old = FieldSet.EMPTY_SET;
    +
    +	@Override
    +	public FieldSet getTargetFields(int input, int sourceField) {
    +
    +		if (input != 0 && input != 1) {
    +			throw new IndexOutOfBoundsException();
    +		} else if (input == 0) {
    +
    +			return fieldMapping1.get(sourceField);
    +		} else {
    +			return fieldMapping2.get(sourceField);
     		}
    -		
    -		FieldSet fs = old.addField(destinationField);
    -		this.forwardedFields1.put(sourceField, fs);
     	}
    -	
    -	/**
    -	 * Adds, to the existing information, a field that is forwarded directly
    -	 * from the source record(s) in the first input to multiple fields in
    -	 * the destination record(s).
    -	 * 
    -	 * @param sourceField the position in the source record(s)
    -	 * @param destinationFields the position in the destination record(s)
    -	 */
    -	public void addForwardedField1(int sourceField, FieldSet destinationFields) {
    -		FieldSet old = this.forwardedFields1.get(sourceField);
    -		if (old == null) {
    -			old = FieldSet.EMPTY_SET;
    +
    +	@Override
    +	public int getSourceField(int input, int targetField) {
    +		Map<Integer, FieldSet> fieldMapping;
    +
    +		if (input != 0 && input != 1) {
    +			throw new IndexOutOfBoundsException();
    +		} else if (input == 0) {
    +			fieldMapping = fieldMapping1;
    +		} else {
    +			fieldMapping = fieldMapping2;
    +		}
    +
    +		for (int sourceField : fieldMapping.keySet()) {
    --- End diff --
    
    Good point


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/311#discussion_r23073054
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---
    @@ -45,6 +46,16 @@
      */
     public class PojoTypeInfo<T> extends CompositeType<T>{
     
    +	private final static String REGEX_FIELD = "[a-zA-Z_\\$][a-zA-Z0-9_\\$]*";
    --- End diff --
    
    Right! Will change that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/311#issuecomment-70421463
  
    Look good. The test coverage is massive - very good for a delicate concept like this :-)
    
    One remaining issue is the naming of the methods/concepts and how they are exposed in the APIs.
    
     - I personally liked it better the way it was before. For example, binary operators returned `DualInputSematicProperties` and the methods were `getForwardedFieldsFirst(int field)`. That looked more intuitive for API users than operating on the interface `SemanticProperties` and using `getTargetFields(input, position)`.
    
     - The backwards access (get source field for target field) is relevant only in the optimizer, correct?
    Is there some way to expose that to th eoptimizer only? It makes the public API more confusing.
    
     - Is it necessary to have the extra methods `hasFieldForwardInformation()`? This again makes it more complicated to use. Can we not simply have a null entry for fields that have no forward information?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/311#issuecomment-70371949
  
    Right! 
    Some built-in operators had semantic annotations like filter, projection, and aggregation. Due to nesting support, semantic annotation can be added to more operators.
    Forgot to check that. Very good catch! :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/311#discussion_r23163889
  
    --- Diff: flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java ---
    @@ -86,8 +87,8 @@ public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
     	}
     
     	@Override
    -	public boolean isFieldConstant(int input, int fieldNumber) {
    -		return false;
    +	public SemanticProperties getSemanticProperties() {
    +		return null;
    --- End diff --
    
    Yes, will do that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/311#discussion_r23069617
  
    --- Diff: flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java ---
    @@ -266,4 +268,44 @@ public void computeOutputEstimates(DataStatistics statistics) {
     		this.estimatedOutputSize = in1.estimatedOutputSize > 0 && in2.estimatedOutputSize > 0 ?
     			in1.estimatedOutputSize + in2.estimatedOutputSize : -1;
     	}
    +
    +	public static class UnionSemanticProperties implements SemanticProperties {
    +
    +		@Override
    +		public FieldSet getTargetFields(int input, int sourceField) {
    +			if (input != 0 && input != 1) {
    +				throw new IndexOutOfBoundsException();
    --- End diff --
    
    How about returning an exception that explains that unions only support input to be 0 or 1. ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/311#discussion_r23135475
  
    --- Diff: flink-compiler/src/main/java/org/apache/flink/compiler/dag/AbstractPartialSolutionNode.java ---
    @@ -86,8 +87,8 @@ public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
     	}
     
     	@Override
    -	public boolean isFieldConstant(int input, int fieldNumber) {
    -		return false;
    +	public SemanticProperties getSemanticProperties() {
    +		return null;
    --- End diff --
    
    Can we improve null pointer safety by returning a special constant SemanticProperties object that marks all fields as changed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/311#issuecomment-70230559
  
    Except for the comments and the missing documentation, the change looks good.
    I can however not really validate the changes in the optimizer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/311


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/311#issuecomment-70517390
  
    IMO, users should only be allowed to set semantic properties though field expression strings. There should be no need to implement an own SemanticProperty class or manually set the forwarded fields in SemanticProperty objects. Setting these fields is not trivial because of field indices needs to be flattened and types of source and target fields should be checked for compatibility. From my point of view, this is clearly a developer API. Whoever gets in touch with it should not have problems dealing with input ids or implementing the backwards access.
    
    The {{hasFieldForwardInfomation()}} can be removed though. Will do that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/311#issuecomment-71613018
  
    +1 for merging it
    
    Whats the plan with the documentation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/311#discussion_r23070438
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---
    @@ -45,6 +46,16 @@
      */
     public class PojoTypeInfo<T> extends CompositeType<T>{
     
    +	private final static String REGEX_FIELD = "[a-zA-Z_\\$][a-zA-Z0-9_\\$]*";
    --- End diff --
    
    Java allows to use any unicode character to be used as field names. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/311#issuecomment-70236149
  
    Thanks for the review!
    
    Proposed names for "constant field" semantic properties:
    * constant fields (current)
    * unmodified fields
    * forwarded fields
    * preserved fields
    
    I leaning towards forwarded fields. Other opinions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/311#discussion_r23070841
  
    --- Diff: flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java ---
    @@ -266,4 +268,44 @@ public void computeOutputEstimates(DataStatistics statistics) {
     		this.estimatedOutputSize = in1.estimatedOutputSize > 0 && in2.estimatedOutputSize > 0 ?
     			in1.estimatedOutputSize + in2.estimatedOutputSize : -1;
     	}
    +
    +	public static class UnionSemanticProperties implements SemanticProperties {
    +
    +		@Override
    +		public FieldSet getTargetFields(int input, int sourceField) {
    +			if (input != 0 && input != 1) {
    +				throw new IndexOutOfBoundsException();
    --- End diff --
    
    Ah, okay. Then its fine.
    I saw many helpful exceptions in this change. So I guess the user-facing exceptions are more descriptive. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-1328] Reworked semantic annotations

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/311#discussion_r23069937
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java ---
    @@ -40,38 +45,108 @@
     import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFields;
     import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsFirst;
     import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsSecond;
    +import org.apache.flink.api.java.operators.Keys;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
     
     public class SemanticPropUtil {
     
    -	private final static String REGEX_LIST = "(\\s*(\\d+\\s*,\\s*)*(\\d+\\s*))";
    -	private final static String REGEX_FORWARD = "(\\s*(\\d+)\\s*->(" + REGEX_LIST + "|(\\*)))";
    -	private final static String REGEX_LIST_OR_FORWARD = "(" + REGEX_LIST + "|" + REGEX_FORWARD + ")";
    -	private final static String REGEX_ANNOTATION = "(\\s*(" + REGEX_LIST_OR_FORWARD + "\\s*;\\s*)*(" + REGEX_LIST_OR_FORWARD + "\\s*))";
    +	private final static String REGEX_WILDCARD = "[\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR+"\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA+"]";
    +	private final static String REGEX_SINGLE_FIELD = "[a-zA-Z0-9_\\$]+";
    +	private final static String REGEX_NESTED_FIELDS = "((" + REGEX_SINGLE_FIELD + "\\.)*" + REGEX_SINGLE_FIELD + ")(\\."+ REGEX_WILDCARD +")?";
     
    +	private final static String REGEX_LIST = "((" + REGEX_NESTED_FIELDS + ";)*(" + REGEX_NESTED_FIELDS + ");?)";
    +	private final static String REGEX_FORWARD = "(("+ REGEX_NESTED_FIELDS +"|"+ REGEX_WILDCARD +")->(" + REGEX_NESTED_FIELDS + "|"+ REGEX_WILDCARD +"))";
    +	private final static String REGEX_FIELD_OR_FORWARD = "(" + REGEX_NESTED_FIELDS + "|" + REGEX_FORWARD + ")";
    +	private final static String REGEX_ANNOTATION = "((" + REGEX_FIELD_OR_FORWARD + ";)*(" + REGEX_FIELD_OR_FORWARD + ");?)";
    +
    +	private static final Pattern PATTERN_WILDCARD = Pattern.compile(REGEX_WILDCARD);
     	private static final Pattern PATTERN_FORWARD = Pattern.compile(REGEX_FORWARD);
     	private static final Pattern PATTERN_ANNOTATION = Pattern.compile(REGEX_ANNOTATION);
     	private static final Pattern PATTERN_LIST = Pattern.compile(REGEX_LIST);
    +	private static final Pattern PATTERN_FIELD = Pattern.compile(REGEX_NESTED_FIELDS);
     
    -	private static final Pattern PATTERN_DIGIT = Pattern.compile("\\d+");
    -
    -	public static SingleInputSemanticProperties createProjectionPropertiesSingle(int[] fields) {
    +	public static SingleInputSemanticProperties createProjectionPropertiesSingle(int[] fields, CompositeType<?> inType)
    +	{
     		SingleInputSemanticProperties ssp = new SingleInputSemanticProperties();
    -		for (int i = 0; i < fields.length; i++) {
    -			ssp.addForwardedField(fields[i], i);
    +
    +		int[] sourceOffsets = new int[inType.getArity()];
    +		sourceOffsets[0] = 0;
    +		for(int i=1; i<inType.getArity(); i++) {
    +			sourceOffsets[i] = inType.getTypeAt(i-1).getTotalFields() + sourceOffsets[i-1];
     		}
    +
    +		int targetOffset = 0;
    +		for(int i=0; i<fields.length; i++) {
    +			int sourceOffset = sourceOffsets[fields[i]];
    +			int numFieldsToCopy = inType.getTypeAt(fields[i]).getTotalFields();
    +
    +			for(int j=0; j<numFieldsToCopy; j++) {
    +				ssp.addForwardedField(sourceOffset+j, targetOffset+j);
    +			}
    +			targetOffset += numFieldsToCopy;
    +		}
    +
     		return ssp;
     	}
     
    -	public static DualInputSemanticProperties createProjectionPropertiesDual(int[] fields, boolean[] isFromFirst) {
    +	public static DualInputSemanticProperties createProjectionPropertiesDual(int[] fields, boolean[] isFromFirst,
    +																				TypeInformation<?> inType1, TypeInformation<?> inType2)
    +	{
     		DualInputSemanticProperties dsp = new DualInputSemanticProperties();
     
    -		for (int i = 0; i < fields.length; i++) {
    -			if (isFromFirst[i]) {
    -				dsp.addForwardedField1(fields[i], i);
    +		int[] sourceOffsets1;
    +		if(inType1 instanceof TupleTypeInfo<?>) {
    +			sourceOffsets1 = new int[inType1.getArity()];
    +			sourceOffsets1[0] = 0;
    +			for(int i=1; i<inType1.getArity(); i++) {
    +				sourceOffsets1[i] = ((TupleTypeInfo<?>)inType1).getTypeAt(i-1).getTotalFields() + sourceOffsets1[i-1];
    +			}
    +		} else {
    +			sourceOffsets1 = new int[] {0};
    +		}
    +
    +		int[] sourceOffsets2;
    +		if(inType2 instanceof TupleTypeInfo<?>) {
    +			sourceOffsets2 = new int[inType2.getArity()];
    +			sourceOffsets2[0] = 0;
    +			for(int i=1; i<inType2.getArity(); i++) {
    +				sourceOffsets2[i] = ((TupleTypeInfo<?>)inType2).getTypeAt(i-1).getTotalFields() + sourceOffsets2[i-1];
    +			}
    +		} else {
    +			sourceOffsets2 = new int[] {0};
    +		}
    --- End diff --
    
    Line 97 - 117 look pretty copy-pasted. I think a method that gets a TupleTypeInfo and returns the sourceOffset would be nicer here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---