You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kl0u <gi...@git.apache.org> on 2017/02/21 10:02:31 UTC

[GitHub] flink pull request #3375: [FLINK-5845] [cep] Unify keyed and non-keyed opera...

GitHub user kl0u opened a pull request:

    https://github.com/apache/flink/pull/3375

    [FLINK-5845] [cep] Unify keyed and non-keyed operators.

    This PR is the first step towards making the CEP library rescalable and backwards compatible.
    
    It just merges the keyed and non-keyed operators into a single, keyed one. A more detailed 
    description can be found in the related JIRA.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kl0u/flink cep-unification

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3375.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3375
    
----
commit 5fec00f79e9deb14c136b55489e96555d9194d1c
Author: kl0u <kk...@gmail.com>
Date:   2017-02-16T11:02:25Z

    [FLINK-5845] [cep] Unify keyed and non-keyed operators.
    
    Now all cep operators are keyed, and for the non-keyed
    usecases, we key on a dummy key and use the keyed operator.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3375: [FLINK-5845] [cep] Unify keyed and non-keyed operators.

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/3375
  
    Thanks @tillrohrmann ! I integrated your comments and merged it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3375: [FLINK-5845] [cep] Unify keyed and non-keyed opera...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3375#discussion_r102462053
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java ---
    @@ -127,16 +134,36 @@
     					keySerializer,
     					nfaFactory));
     		} else {
    -			patternStream = inputStream.transform(
    +
    +			KeySelector<T, Byte> keySelector = new NullByteKeySelector<>();
    +			TypeSerializer<Byte> keySerializer = ByteSerializer.INSTANCE;
    +
    +			patternStream = inputStream.keyBy(new NullByteKeySelector<T>()).transform(
     				"TimeoutCEPPatternOperator",
     				eitherTypeInformation,
    -				new TimeoutCEPPatternOperator<>(
    +				new TimeoutKeyedCEPPatternOperator<>(
     					inputSerializer,
     					isProcessingTime,
    +					keySelector,
    +					keySerializer,
     					nfaFactory
     				)).forceNonParallel();
     		}
     
     		return patternStream;
     	}
    +
    +	/**
    +	 * Used as dummy KeySelector to allow using WindowOperator for Non-Keyed Windows.
    --- End diff --
    
    Does not seem to fit here. Copy & paste artifact?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3375: [FLINK-5845] [cep] Unify keyed and non-keyed opera...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u closed the pull request at:

    https://github.com/apache/flink/pull/3375


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3375: [FLINK-5845] [cep] Unify keyed and non-keyed opera...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3375#discussion_r102461630
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java ---
    @@ -100,27 +119,21 @@ public void open() throws Exception {
     
     		if (nfaOperatorState == null) {
     			nfaOperatorState = getPartitionedState(
    -					new ValueStateDescriptor<NFA<IN>>(
    -						NFA_OPERATOR_STATE_NAME,
    -						new NFA.Serializer<IN>()));
    +				new ValueStateDescriptor<>(NFA_OPERATOR_STATE_NAME, new NFA.Serializer<IN>()));
     		}
     
     		@SuppressWarnings("unchecked,rawtypes")
     		TypeSerializer<StreamRecord<IN>> streamRecordSerializer =
    -				(TypeSerializer) new StreamElementSerializer<>(getInputSerializer());
    +			(TypeSerializer) new StreamElementSerializer<>(getInputSerializer());
     
     		if (priorityQueueOperatorState == null) {
     			priorityQueueOperatorState = getPartitionedState(
    -					new ValueStateDescriptor<>(
    -						PRIORIRY_QUEUE_STATE_NAME,
    -						new PriorityQueueSerializer<>(
    -								streamRecordSerializer,
    -								new PriorityQueueStreamRecordFactory<IN>())));
    +				new ValueStateDescriptor<>(PRIORITY_QUEUE_STATE_NAME,
    +					new PriorityQueueSerializer<>(streamRecordSerializer, new PriorityQueueStreamRecordFactory<IN>())));
    --- End diff --
    
    If doing reformatting changes then please try to not change a consistent style. When breaking long parameter lists, imo, every parameter should be on a separate line and indented identically. This is not the case with `PRIORITY_QUEUE_STATE_NAME`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3375: [FLINK-5845] [cep] Unify keyed and non-keyed operators.

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/3375
  
    R @tillrohrmann 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3375: [FLINK-5845] [cep] Unify keyed and non-keyed opera...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3375#discussion_r102462322
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java ---
    @@ -127,16 +134,36 @@
     					keySerializer,
     					nfaFactory));
     		} else {
    -			patternStream = inputStream.transform(
    +
    +			KeySelector<T, Byte> keySelector = new NullByteKeySelector<>();
    +			TypeSerializer<Byte> keySerializer = ByteSerializer.INSTANCE;
    +
    +			patternStream = inputStream.keyBy(new NullByteKeySelector<T>()).transform(
     				"TimeoutCEPPatternOperator",
     				eitherTypeInformation,
    -				new TimeoutCEPPatternOperator<>(
    +				new TimeoutKeyedCEPPatternOperator<>(
     					inputSerializer,
     					isProcessingTime,
    +					keySelector,
    +					keySerializer,
     					nfaFactory
     				)).forceNonParallel();
     		}
     
     		return patternStream;
     	}
    +
    +	/**
    +	 * Used as dummy KeySelector to allow using WindowOperator for Non-Keyed Windows.
    +	 * @param <T>
    +	 */
    +	protected static class NullByteKeySelector<T> implements KeySelector<T, Byte> {
    --- End diff --
    
    Duplicate code. Already exists in `AllWindowedStream`. Better to refactor the existing code to make it usable here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3375: [FLINK-5845] [cep] Unify keyed and non-keyed opera...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3375#discussion_r102462479
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java ---
    @@ -40,25 +40,27 @@
     public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOperator<IN, KEY, Map<String, IN>> {
     	private static final long serialVersionUID = 5328573789532074581L;
     
    -	public KeyedCEPPatternOperator(TypeSerializer<IN> inputSerializer, boolean isProcessingTime, KeySelector<IN, KEY> keySelector, TypeSerializer<KEY> keySerializer, NFACompiler.NFAFactory<IN> nfaFactory) {
    +	public KeyedCEPPatternOperator(
    +		TypeSerializer<IN> inputSerializer,
    +		boolean isProcessingTime,
    +		KeySelector<IN, KEY> keySelector,
    +		TypeSerializer<KEY> keySerializer,
    +		NFACompiler.NFAFactory<IN> nfaFactory) {
    --- End diff --
    
    double indentation for method declarations is better because then one can more easily distinguish the parameter list from the body.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---