You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kl0u <gi...@git.apache.org> on 2017/02/24 13:10:46 UTC

[GitHub] flink pull request #3412: [FLINK-5420] [cep] Make the CEP operators rescalab...

GitHub user kl0u opened a pull request:

    https://github.com/apache/flink/pull/3412

    [FLINK-5420] [cep] Make the CEP operators rescalable

    More on the related JIRA: https://issues.apache.org/jira/browse/FLINK-5420

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kl0u/flink cep-uni-rescale

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3412.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3412
    
----
commit ee5862973d2b8b8dca8333eaaff6ca82a4e9a069
Author: kl0u <kk...@gmail.com>
Date:   2017-02-24T09:34:43Z

    [FLINK-5420] [cep] Make the CEP operators rescalable
    
    Introduces the KeyRegistry in the TimeServiceHandler
    which allows to specify a callback and register keys
    for which we want this callback to be invoked on each
    watermark.
    
    Given this service, now the CEP operator has only
    keyed state, and the non-keyed one (keys) are
    handled by the KeyRegistry.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3412: [FLINK-5420] [cep] Make the CEP operators rescalable

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/3412
  
    Thanks for the comments @aljoscha . I integrated them and waiting for Travis.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3412: [FLINK-5420] [cep] Make the CEP operators rescalab...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3412#discussion_r102939631
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---
    @@ -887,6 +864,30 @@ public void close() {
     	//  Watermark handling
     	// ------------------------------------------------------------------------
     
    +	public <K> void registerWatermarkCallback(KeyRegistry.OnWatermarkCallback<K> callback, TypeSerializer<K> keySerializer) {
    --- End diff --
    
    The key serialiser can be retrieved from `getKeyedStateBackend().getKeySerializer()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3412: [FLINK-5420] [cep] Make the CEP operators rescalab...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3412#discussion_r102940564
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---
    @@ -887,6 +864,30 @@ public void close() {
     	//  Watermark handling
     	// ------------------------------------------------------------------------
     
    +	public <K> void registerWatermarkCallback(KeyRegistry.OnWatermarkCallback<K> callback, TypeSerializer<K> keySerializer) {
    --- End diff --
    
    Come to think of it, we should probably only expose a method like `getInternalWatermarkCallbackService()`. (I discussed this offline with @kl0u)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3412: [FLINK-5420] [cep] Make the CEP operators rescalab...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3412#discussion_r103471738
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OnWatermarkCallback.java ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.operators;
    +
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +
    +import java.io.IOException;
    +
    +/**
    + * A callback registered with the {@link InternalWatermarkCallbackService} service. This callback will
    + * be invoked for all keys registered with the service, upon reception of a watermark.
    + */
    +public interface OnWatermarkCallback<KEY> {
    --- End diff --
    
    This should probably also be `@Internal`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3412: [FLINK-5420] [cep] Make the CEP operators rescalab...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3412#discussion_r102940776
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimeServiceHandler.java ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.operators;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupsList;
    +import org.apache.flink.runtime.state.VoidNamespaceSerializer;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * A handler keeping all the time-related services available to all operators extending the
    + * {@link AbstractStreamOperator}. These are the different {@link HeapInternalTimerService timer services}
    + * and the {@link KeyRegistry}.
    + *
    + * <b>NOTE:</b> These services are only available to keyed operators.
    + *
    + * @param <K> The type of keys used for the timers and the registry.
    + * @param <N> The type of namespace used for the timers.
    + */
    +public class TimeServiceHandler<K, N> {
    --- End diff --
    
    This should be package private since it's only used by the `AbstractStreamOperator`. Maybe also mark it as `@Internal` just to be save. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3412: [FLINK-5420] [cep] Make the CEP operators rescalable

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/3412
  
    Thanks for the comments @aljoscha .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3412: [FLINK-5420] [cep] Make the CEP operators rescalab...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3412#discussion_r103471629
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java ---
    @@ -0,0 +1,237 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.operators;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.runtime.state.KeyGroupsList;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.util.HashSet;
    +import java.util.Set;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * The watermark callback service allows to register a {@link OnWatermarkCallback OnWatermarkCallback}
    + * and multiple keys, for which the callback will be invoked every time a new {@link Watermark} is received
    + * (after the registration of the key).
    + * <p>
    + * <b>NOTE: </b> This service is only available to <b>keyed</b> operators.
    + *
    + *  @param <K> The type of key returned by the {@code KeySelector}.
    + * */
    +public class InternalWatermarkCallbackService<K> {
    --- End diff --
    
    Slight Javadoc typo, and this should probably be `@Internal`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3412: [FLINK-5420] [cep] Make the CEP operators rescalab...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3412#discussion_r102940901
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyRegistry.java ---
    @@ -0,0 +1,257 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.operators;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.runtime.state.KeyGroupsList;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.util.HashSet;
    +import java.util.Set;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * The key registry allows to register a {@link OnWatermarkCallback} and multiple keys, for which
    + * the callback will be invoked periodically, upon reception of each subsequent {@link Watermark},
    + * after the registration of the key.
    + * <p>
    + * <b>NOTE: </b> This service is only available to <b>keyed</b> operators.
    + *
    + *  @param <K> The type of key returned by the {@code KeySelector}.
    + * */
    +public class KeyRegistry<K> {
    --- End diff --
    
    To keep it in line with the other services this could be called `InternalWatermarkCallbackService`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3412: [FLINK-5420] [cep] Make the CEP operators rescalab...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u closed the pull request at:

    https://github.com/apache/flink/pull/3412


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---