You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2016/04/18 18:12:36 UTC

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/1905

    [FLINK-3708] Scala API for CEP (initial).

    
    This module flink-cep-scala adds a Scala pendant for the  Java CEP API to Flink.
    
    I created Scala classes for Pattern and PatternStream, helper classes, as well as corresponding tests.
    
    PatternStream in flink-cep is extended to obtain explicit TypeInformation from Scala code.
    Pattern in flink-cep is slightly modified in the type parameters for member previous.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink dev-cep-scala

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1905.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1905
    
----
commit 306210da82c94f6770e22690799a3f1af9a877e7
Author: Stefan Richter <st...@gmail.com>
Date:   2016-04-18T11:10:09Z

    Preparations in Pattern and PatternStream of flink-cep to support flink-cep-scala functionality.

commit 731d96ae78c4dc1b7125966900886225374a3eb5
Author: Stefan Richter <st...@gmail.com>
Date:   2016-04-18T11:10:35Z

    Initial commit for flink-cep-scala API.

commit 7da7c14787226d9aa5914a138f632b8b6d5f1335
Author: Stefan Richter <st...@gmail.com>
Date:   2016-04-18T13:48:51Z

    [FLINK-3708] Added missing test dependency to pom.

commit 1468bd91aea7d2a757ddeea8b8506b5ad9f3d289
Author: Stefan Richter <st...@gmail.com>
Date:   2016-04-18T14:50:53Z

    [FLINK-3708] Pattern in Scala API now uses Option to shield users against null values from the Java API

commit 9ad8719c5b8767ec2fb0425f55cc7df105e36bcb
Author: Stefan Richter <st...@gmail.com>
Date:   2016-04-18T16:04:35Z

    [FLINK-3708] Added test-jar for build phase to pom.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60383831
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala.pattern
    +
    +import org.apache.flink.api.common.functions.FilterFunction
    +import org.apache.flink.cep
    +import org.apache.flink.cep.pattern.{Pattern => JPattern}
    +import org.apache.flink.streaming.api.windowing.time.Time
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * Base class for a pattern definition.
    +  * <p>
    +  * A pattern definition is used by { @link org.apache.flink.cep.nfa.compiler.NFACompiler} to create
    +  * a { @link NFA}.
    +  *
    +  * <pre>{ @code
    +  * Pattern<T, F> pattern = Pattern.<T>begin("start")
    +  * .next("middle").subtype(F.class)
    +  * .followedBy("end").where(new MyFilterFunction());
    +  * }
    +  * </pre>
    +  *
    +  * @param jPattern Underlying Java API Pattern
    +  * @tparam T Base type of the elements appearing in the pattern
    +  * @tparam F Subtype of T to which the current pattern operator is constrained
    +  */
    +class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) {
    +
    +  private[flink] def getWrappedPattern = jPattern
    +
    +
    +  /**
    +    *
    +    * @return Name of the pattern operator
    +    */
    +  def getName: String = jPattern.getName
    +
    +  /**
    +    *
    +    * @return Window length in which the pattern match has to occur
    +    */
    +  def getWindowTime: Option[Time] = {
    +    val time = jPattern.getWindowTime
    +    if (time == null) None else Some(time)
    +  }
    +
    +  /**
    +    *
    +    * @return Filter condition for an event to be matched
    +    */
    +  def getFilterFunction: Option[FilterFunction[F]] = {
    +    val filterFun = jPattern.getFilterFunction
    +    if (filterFun == null) None else Some(filterFun)
    +  }
    +
    +  /**
    +    * Applies a subtype constraint on the current pattern operator. This means that an event has
    +    * to be of the given subtype in order to be matched.
    +    *
    +    * @param clazz Class of the subtype
    +    * @tparam S Type of the subtype
    +    * @return The same pattern operator with the new subtype constraint
    +    */
    +  def subtype[S <: F : ClassTag](clazz: Class[S]): Pattern[T, S] = {
    +    jPattern.subtype(clazz)
    +    this.asInstanceOf[Pattern[T, S]]
    +  }
    +
    +  /**
    +    * Defines the maximum time interval for a matching pattern. This means that the time gap
    +    * between first and the last event must not be longer than the window time.
    +    *
    +    * @param windowTime Time of the matching window
    +    * @return The same pattern operator with the new window length
    +    */
    +  def within(windowTime: Time): Pattern[T, F] = {
    +    jPattern.within(windowTime)
    +    this
    +  }
    +
    +  /**
    +    * Appends a new pattern operator to the existing one. The new pattern operator enforces strict
    +    * temporal contiguity. This means that the whole pattern only matches if an event which matches
    +    * this operator directly follows the preceding matching event. Thus, there cannot be any
    +    * events in between two matching events.
    +    *
    +    * @param name Name of the new pattern operator
    +    * @return A new pattern operator which is appended to this pattern operator
    +    */
    +  def next(name: String): Pattern[T, T] = {
    +    wrapPattern(jPattern.next(name))
    +  }
    +
    +  /**
    +    * Appends a new pattern operator to the existing one. The new pattern operator enforces
    +    * non-strict temporal contiguity. This means that a matching event of this operator and the
    +    * preceding matching event might be interleaved with other events which are ignored.
    +    *
    +    * @param name Name of the new pattern operator
    +    * @return A new pattern operator which is appended to this pattern operator
    +    */
    +  def followedBy(name: String): FollowedByPattern[T, T] = {
    +    FollowedByPattern(jPattern.followedBy(name))
    +  }
    +
    +  /**
    +    * Specifies a filter condition which has to be fulfilled by an event in order to be matched.
    +    *
    +    * @param filter Filter condition
    +    * @return The same pattern operator where the new filter condition is set
    +    */
    +  def where(filter: FilterFunction[F]): Pattern[T, F] = {
    +    jPattern.where(filter)
    +    this
    +  }
    +
    +  /**
    +    * Specifies a filter condition which has to be fulfilled by an event in order to be matched.
    +    *
    +    * @param filterFun Filter condition
    +    * @return The same pattern operator where the new filter condition is set
    +    */
    +  def where(filterFun: F => Boolean): Pattern[T, F] = {
    +    val filter = new FilterFunction[F] {
    +      val cleanFilter = cep.scala.cleanClosure(filterFun)
    +
    +      override def filter(value: F): Boolean = cleanFilter(value)
    +    }
    +    where(filter)
    +  }
    +
    +  //TODO ask about java api change <?> -> <? extends T> and creating a new object vs caching object. equals/hashcode?
    +  /**
    +    *
    +    * @return The previous pattern operator
    +    */
    +  def getPrevious: Option[Pattern[T, _ <: T]] = {
    +    val prev = jPattern.getPrevious
    +    if (prev == null) None else Some(wrapPattern(prev))
    +
    +  }
    +
    +}
    +
    +object Pattern {
    +
    +  /**
    +    * Constructs a new Pattern by wrapping a given Java API Pattern
    +    *
    +    * @param jPattern Underlying Java API Pattern.
    +    * @tparam T Base type of the elements appearing in the pattern
    +    * @tparam F Subtype of T to which the current pattern operator is constrained
    +    * @return New wrapping Pattern object
    +    */
    +  def apply[T: ClassTag, F <: T : ClassTag]
    +  (jPattern: JPattern[T, F]) = new Pattern[T, F](jPattern)
    --- End diff --
    
    Does that fit in one line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60403142
  
    --- Diff: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/Event.scala ---
    @@ -0,0 +1,53 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala
    +
    +import org.apache.flink.api.common.ExecutionConfig
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.common.typeutils.TypeSerializer
    +import org.apache.flink.api.java.typeutils.TypeExtractor
    +
    +object Event {
    +  def createTypeSerializer: TypeSerializer[Event] = {
    +    val typeInformation: TypeInformation[Event] = TypeExtractor.createTypeInfo(classOf[Event])
    +    return typeInformation.createSerializer(new ExecutionConfig)
    +  }
    +}
    +
    +class Event(var id: Int, var name: String, var price: Double) {
    --- End diff --
    
    Why not simply reusing the Java `Event` class from the flink-cep test jar?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60383389
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala.pattern
    +
    +import org.apache.flink.api.common.functions.FilterFunction
    +import org.apache.flink.cep
    +import org.apache.flink.cep.pattern.{Pattern => JPattern}
    +import org.apache.flink.streaming.api.windowing.time.Time
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * Base class for a pattern definition.
    +  * <p>
    +  * A pattern definition is used by { @link org.apache.flink.cep.nfa.compiler.NFACompiler} to create
    +  * a { @link NFA}.
    +  *
    +  * <pre>{ @code
    +  * Pattern<T, F> pattern = Pattern.<T>begin("start")
    +  * .next("middle").subtype(F.class)
    +  * .followedBy("end").where(new MyFilterFunction());
    +  * }
    +  * </pre>
    +  *
    +  * @param jPattern Underlying Java API Pattern
    +  * @tparam T Base type of the elements appearing in the pattern
    +  * @tparam F Subtype of T to which the current pattern operator is constrained
    +  */
    +class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) {
    +
    +  private[flink] def getWrappedPattern = jPattern
    +
    +
    +  /**
    +    *
    +    * @return Name of the pattern operator
    +    */
    +  def getName: String = jPattern.getName
    +
    +  /**
    +    *
    +    * @return Window length in which the pattern match has to occur
    +    */
    +  def getWindowTime: Option[Time] = {
    +    val time = jPattern.getWindowTime
    +    if (time == null) None else Some(time)
    +  }
    +
    +  /**
    +    *
    +    * @return Filter condition for an event to be matched
    +    */
    +  def getFilterFunction: Option[FilterFunction[F]] = {
    +    val filterFun = jPattern.getFilterFunction
    +    if (filterFun == null) None else Some(filterFun)
    --- End diff --
    
    Same here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the pull request:

    https://github.com/apache/flink/pull/1905#issuecomment-214688703
  
    All mentioned issues should be addressed now and the PR is ready.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60925045
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala ---
    @@ -0,0 +1,129 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala
    +
    +import java.util.{Map => JMap}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction,
    +PatternStream => JPatternStream}
    +import org.apache.flink.streaming.api.scala.DataStream
    +import org.apache.flink.streaming.api.scala.asScalaStream
    +import org.apache.flink.util.Collector
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +/**
    +  * Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits detected
    +  * pattern sequences as a map of events associated with their names. The pattern is detected using
    +  * a [[org.apache.flink.cep.nfa.NFA]]. In order to process the detected sequences, the user has to
    +  * specify a [[PatternSelectFunction]] or a [[PatternFlatSelectFunction]].
    +  *
    +  * @param jPatternStream Underlying pattern stream from Java API
    +  * @tparam T Type of the events
    +  */
    +class PatternStream[T: TypeInformation](jPatternStream: JPatternStream[T]) {
    --- End diff --
    
    `TypeInformation` is not needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60925702
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java ---
    @@ -99,9 +116,25 @@
     			null,
     			false);
     
    +		return flatSelect(patternFlatSelectFunction, outTypeInfo);
    +	}
    +
    +	/**
    +	 * Applies a flat select function to the detected pattern sequence. For each pattern sequence
    +	 * the provided {@link PatternFlatSelectFunction} is called. The pattern flat select function
    +	 * can produce an arbitrary number of resulting elements.
    +	 *
    +	 * @param patternFlatSelectFunction The pattern flat select function which is called for each
    +	 *                                  detected pattern sequence.
    +	 * @param <R> Typ of the resulting elements
    +	 * @param outTypeInfo Explicit specification of output type.
    +	 * @return {@link DataStream} which contains the resulting elements from the pattern flat select
    +	 *         function.
    +	 */
    +	public <R> DataStream<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) {
     		return patternStream.flatMap(
    -			new PatternFlatSelectMapper<T, R>(
    -				patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction)
    +			new PatternFlatSelectMapper<>(
    +					patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction)
    --- End diff --
    
    Indentation is off here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60398456
  
    --- Diff: flink-libraries/flink-cep-scala/pom.xml ---
    @@ -0,0 +1,209 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +    <parent>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-libraries</artifactId>
    +        <version>1.1-SNAPSHOT</version>
    +        <relativePath>..</relativePath>
    +    </parent>
    +    <artifactId>flink-cep-scala_2.10</artifactId>
    +    <name>flink-cep-scala</name>
    +    <packaging>jar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-scala_2.10</artifactId>
    +            <version>${project.version}</version>
    +			<scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-clients_2.10</artifactId>
    +            <version>${project.version}</version>
    +			<scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-cep_2.10</artifactId>
    +            <version>${project.version}</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-java_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-scala_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-tests_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +            <type>test-jar</type>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-test-utils_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-java_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +            <type>test-jar</type>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-scala_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +            <type>test-jar</type>
    +        </dependency>
    +    </dependencies>
    +
    +    <build>
    +        <plugins>
    +            <!-- Scala Compiler -->
    +            <plugin>
    +                <groupId>net.alchim31.maven</groupId>
    +                <artifactId>scala-maven-plugin</artifactId>
    +                <version>3.1.4</version>
    +                <executions>
    +                    <!-- Run scala compiler in the process-resources phase, so that dependencies on
    +                        scala classes can be resolved later in the (Java) compile phase -->
    +                    <execution>
    +                        <id>scala-compile-first</id>
    +                        <phase>process-resources</phase>
    +                        <goals>
    +                            <goal>compile</goal>
    +                        </goals>
    +                    </execution>
    +
    +                    <!-- Run scala compiler in the process-test-resources phase, so that dependencies on
    +                         scala classes can be resolved later in the (Java) test-compile phase -->
    +                    <execution>
    +                        <id>scala-test-compile</id>
    +                        <phase>process-test-resources</phase>
    +                        <goals>
    +                            <goal>testCompile</goal>
    +                        </goals>
    +                    </execution>
    +                </executions>
    +                <configuration>
    +                    <jvmArgs>
    +                        <jvmArg>-Xms128m</jvmArg>
    +                        <jvmArg>-Xmx512m</jvmArg>
    +                    </jvmArgs>
    +                    <compilerPlugins combine.children="append">
    +                        <compilerPlugin>
    +                            <groupId>org.scalamacros</groupId>
    +                            <artifactId>paradise_${scala.version}</artifactId>
    +                            <version>${scala.macros.version}</version>
    +                        </compilerPlugin>
    +                    </compilerPlugins>
    +                </configuration>
    +            </plugin>
    --- End diff --
    
    I think you can replace this with
    
    ```
    <plugin>
    				<groupId>org.scala-tools</groupId>
    				<artifactId>maven-scala-plugin</artifactId>
    				<version>2.15.2</version>
    				<executions>
    					<execution>
    						<goals>
    							<goal>compile</goal>
    							<goal>testCompile</goal>
    						</goals>
    					</execution>
    				</executions>
    				<configuration>
    					<sourceDir>src/main/scala</sourceDir>
    					<testSourceDir>src/test/scala</testSourceDir>
    					<jvmArgs>
    						<jvmArg>-Xms64m</jvmArg>
    						<jvmArg>-Xmx1024m</jvmArg>
    					</jvmArgs>
    				</configuration>
    			</plugin>
    ```
    
    Since we don't have a mixed Java Scala project here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1905


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the pull request:

    https://github.com/apache/flink/pull/1905#issuecomment-211819580
  
    Thanks Ufuk! I just saw that the build actually failed and it seems to me that the problem is indeed related to removing the dependency to asm from the pom.xml as Robert suggested? So I could either add the dependency again or do you see any option to solve this otherwise?
    
    There is one remaining question that I already started to discuss with Till about the select functions in PatternStream. The Java API wants to call them with java.util.Map as parameters, but I don't think we want Java classes in our Scala API. Currently I see two options how to change this without touching the Java API:
    
    1) Use Scala mutable.Map and automatic conversion.
    2) Use Scala immutable.Map and explicit conversion. From how I interpret the actual use of the Map in this function, the semantics are actually that of an immutable map.
    
    If the impact of the conversion is not an issue, I have already implemented version 2 (it is currently commented out in favor of a version that still uses Java Map. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60452526
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala ---
    @@ -0,0 +1,42 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala
    +
    +import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern, Pattern => JPattern}
    +
    +import _root_.scala.reflect.ClassTag
    +
    +package object pattern {
    +  /**
    +    * Utility method to wrap { @link org.apache.flink.cep.pattern.Pattern} and its subclasses
    +    * for usage with the Scala API.
    +    *
    +    * @param javaPattern The underlying pattern from the Java API
    +    * @tparam T Base type of the elements appearing in the pattern
    +    * @tparam F Subtype of T to which the current pattern operator is constrained
    +    * @return A pattern from the Scala API which wraps the pattern from the Java API
    +    */
    +  private[flink] def wrapPattern[
    +  T: ClassTag, F <: T : ClassTag](javaPattern: JPattern[T, F])
    +  : Pattern[T, F] = javaPattern match {
    +    case f: JFollowedByPattern[T, F] => FollowedByPattern[T, F](f)
    +    case p: JPattern[T, F] => Pattern[T, F](p)
    +    case _ => null
    --- End diff --
    
    I am not sure if this case should trigger an exception, because the Java API is actually allowed to return null here in case there is no previous pattern. Our pattern will then wrap this as an undefined Option.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60924213
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala
    +
    +import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern, Pattern => JPattern}
    +
    +import scala.reflect.ClassTag
    +
    +package object pattern {
    +  /**
    +    * Utility method to wrap [[org.apache.flink.cep.pattern.Pattern]] and its subclasses
    +    * for usage with the Scala API.
    +    *
    +    * @param javaPattern The underlying pattern from the Java API
    +    * @tparam T Base type of the elements appearing in the pattern
    +    * @tparam F Subtype of T to which the current pattern operator is constrained
    +    * @return A pattern from the Scala API which wraps the pattern from the Java API
    +    */
    +  private[flink] def wrapPattern[T: ClassTag, F <: T](javaPattern: JPattern[T, F])
    --- End diff --
    
    `ClassTag` should not be necessary if `FollowedByPattern` and `Pattern` are adapted accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60402382
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala.pattern
    +
    +import org.apache.flink.api.common.functions.FilterFunction
    +import org.apache.flink.cep
    +import org.apache.flink.cep.pattern.{Pattern => JPattern}
    +import org.apache.flink.streaming.api.windowing.time.Time
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * Base class for a pattern definition.
    +  * <p>
    +  * A pattern definition is used by { @link org.apache.flink.cep.nfa.compiler.NFACompiler} to create
    +  * a { @link NFA}.
    +  *
    +  * <pre>{ @code
    +  * Pattern<T, F> pattern = Pattern.<T>begin("start")
    +  * .next("middle").subtype(F.class)
    +  * .followedBy("end").where(new MyFilterFunction());
    +  * }
    +  * </pre>
    +  *
    +  * @param jPattern Underlying Java API Pattern
    +  * @tparam T Base type of the elements appearing in the pattern
    +  * @tparam F Subtype of T to which the current pattern operator is constrained
    +  */
    +class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) {
    +
    +  private[flink] def getWrappedPattern = jPattern
    +
    +
    +  /**
    +    *
    +    * @return Name of the pattern operator
    +    */
    +  def getName: String = jPattern.getName
    +
    +  /**
    +    *
    +    * @return Window length in which the pattern match has to occur
    +    */
    +  def getWindowTime: Option[Time] = {
    +    val time = jPattern.getWindowTime
    +    if (time == null) None else Some(time)
    +  }
    +
    +  /**
    +    *
    +    * @return Filter condition for an event to be matched
    +    */
    +  def getFilterFunction: Option[FilterFunction[F]] = {
    +    val filterFun = jPattern.getFilterFunction
    +    if (filterFun == null) None else Some(filterFun)
    +  }
    +
    +  /**
    +    * Applies a subtype constraint on the current pattern operator. This means that an event has
    +    * to be of the given subtype in order to be matched.
    +    *
    +    * @param clazz Class of the subtype
    +    * @tparam S Type of the subtype
    +    * @return The same pattern operator with the new subtype constraint
    +    */
    +  def subtype[S <: F : ClassTag](clazz: Class[S]): Pattern[T, S] = {
    +    jPattern.subtype(clazz)
    +    this.asInstanceOf[Pattern[T, S]]
    +  }
    +
    +  /**
    +    * Defines the maximum time interval for a matching pattern. This means that the time gap
    +    * between first and the last event must not be longer than the window time.
    +    *
    +    * @param windowTime Time of the matching window
    +    * @return The same pattern operator with the new window length
    +    */
    +  def within(windowTime: Time): Pattern[T, F] = {
    +    jPattern.within(windowTime)
    +    this
    +  }
    +
    +  /**
    +    * Appends a new pattern operator to the existing one. The new pattern operator enforces strict
    +    * temporal contiguity. This means that the whole pattern only matches if an event which matches
    +    * this operator directly follows the preceding matching event. Thus, there cannot be any
    +    * events in between two matching events.
    +    *
    +    * @param name Name of the new pattern operator
    +    * @return A new pattern operator which is appended to this pattern operator
    +    */
    +  def next(name: String): Pattern[T, T] = {
    +    wrapPattern(jPattern.next(name))
    --- End diff --
    
    Could be done more efficiently with `Pattern[T, T](jPattern.next(name))`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60401470
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala.pattern
    +
    +import org.apache.flink.api.common.functions.FilterFunction
    +import org.apache.flink.cep
    +import org.apache.flink.cep.pattern.{Pattern => JPattern}
    +import org.apache.flink.streaming.api.windowing.time.Time
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * Base class for a pattern definition.
    +  * <p>
    +  * A pattern definition is used by { @link org.apache.flink.cep.nfa.compiler.NFACompiler} to create
    +  * a { @link NFA}.
    +  *
    +  * <pre>{ @code
    +  * Pattern<T, F> pattern = Pattern.<T>begin("start")
    +  * .next("middle").subtype(F.class)
    +  * .followedBy("end").where(new MyFilterFunction());
    +  * }
    +  * </pre>
    --- End diff --
    
    Code examples in ScalaDocs use `{{{ code }}}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the pull request:

    https://github.com/apache/flink/pull/1905#issuecomment-213443445
  
    The current version should cover all of your feedback and could be pulled @tillrohrmann .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1905#issuecomment-211784417
  
    Thank you for this great contributio! I think that CEP and a concise Scala API go very well together. :-) I'm not a Scala expert, but I'll also have a look later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60399432
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala ---
    @@ -0,0 +1,142 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala
    +
    +import java.util.{Map => JMap}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction, PatternStream => JPatternStream}
    +import org.apache.flink.streaming.api.scala._
    +import org.apache.flink.util.Collector
    +import scala.reflect.ClassTag
    +
    +/**
    +  * Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits detected
    +  * pattern sequences as a map of events associated with their names. The pattern is detected using a
    +  * { @link org.apache.flink.cep.nfa.NFA}. In order to process the detected sequences, the user
    +  * has to specify a { @link PatternSelectFunction} or a { @link PatternFlatSelectFunction}.
    +  *
    +  * @param jPatternStream Underlying pattern stream from Java API
    +  * @tparam T Type of the events
    +  */
    +class PatternStream[T: TypeInformation : ClassTag](jPatternStream: JPatternStream[T]) {
    +
    +  private[flink] def getWrappedPatternStream = jPatternStream
    +
    +  /**
    +    * Applies a select function to the detected pattern sequence. For each pattern sequence the
    +    * provided { @link PatternSelectFunction} is called. The pattern select function can produce
    +    * exactly one resulting element.
    +    *
    +    * @param patternSelectFunction The pattern select function which is called for each detected
    +    *                              pattern sequence.
    +    * @tparam R Type of the resulting elements
    +    * @return { @link DataStream} which contains the resulting elements from the pattern select
    +    *         unction.
    +    */
    +  def select[R: TypeInformation : ClassTag](patternSelectFunction: PatternSelectFunction[T, R]): DataStream[R] = {
    +    asScalaStream(jPatternStream.select(patternSelectFunction, implicitly[TypeInformation[R]]))
    +  }
    +
    +  /**
    +    * Applies a flat select function to the detected pattern sequence. For each pattern sequence
    +    * the provided { @link PatternFlatSelectFunction} is called. The pattern flat select function
    +    * can produce an arbitrary number of resulting elements.
    +    *
    +    * @param patternFlatSelectFunction The pattern flat select function which is called for each
    +    *                                  detected pattern sequence.
    +    * @tparam R Type of the resulting elements
    +    * @return { @link DataStream} which contains the resulting elements from the pattern flat select
    +    *         function.
    +    */
    +  def flatSelect[R: TypeInformation : ClassTag](patternFlatSelectFunction: PatternFlatSelectFunction[T, R]): DataStream[R] = {
    +    asScalaStream(jPatternStream.flatSelect(patternFlatSelectFunction, implicitly[TypeInformation[R]]))
    +  }
    +
    +  /**
    +    * Applies a select function to the detected pattern sequence. For each pattern sequence the
    +    * provided { @link PatternSelectFunction} is called. The pattern select function can produce
    +    * exactly one resulting element.
    +    *
    +    * @param patternSelectFun The pattern select function which is called for each detected
    +    *                         pattern sequence.
    +    * @tparam R Type of the resulting elements
    +    * @return { @link DataStream} which contains the resulting elements from the pattern select
    +    *         function.
    +    */
    +  def select[R: TypeInformation : ClassTag](patternSelectFun: JMap[String, T] => R): DataStream[R] = {
    +    val patternSelectFunction: PatternSelectFunction[T, R] = new PatternSelectFunction[T, R] {
    +      val cleanFun = cleanClosure(patternSelectFun)
    +
    +      def select(in: JMap[String, T]): R = cleanFun(in)
    +    }
    +    select(patternSelectFunction)
    +  }
    +
    +  //  def select[R: TypeInformation : ClassTag](patternSelectFun: Map[String, T] => R): DataStream[R] = {
    +  //    val patternSelectFunction: PatternSelectFunction[T, R] = new PatternSelectFunction[T, R] {
    +  //      val cleanFun = cleanClosure(patternSelectFun)
    +  //
    +  //      def select(in: JMap[String, T]): R = cleanFun(in.toMap)
    +  //    }
    +  //    select(patternSelectFunction)
    +  //  }
    +
    +  //TODO ask about implicit conversion between Java Map and Scala Map
    +  /**
    +    * Applies a flat select function to the detected pattern sequence. For each pattern sequence
    +    * the provided { @link PatternFlatSelectFunction} is called. The pattern flat select function
    +    * can produce an arbitrary number of resulting elements.
    +    *
    +    * @param patternFlatSelectFun The pattern flat select function which is called for each
    +    *                             detected pattern sequence.
    +    * @tparam R Type of the resulting elements
    +    * @return { @link DataStream} which contains the resulting elements from the pattern flat select
    +    *         function.
    +    */
    +  def flatSelect[R: TypeInformation : ClassTag](patternFlatSelectFun: (JMap[String, T], Collector[R]) => Unit): DataStream[R] = {
    +    val patternFlatSelectFunction: PatternFlatSelectFunction[T, R] = new PatternFlatSelectFunction[T, R] {
    +      val cleanFun = cleanClosure(patternFlatSelectFun)
    +
    +      def flatSelect(pattern: JMap[String, T], out: Collector[R]): Unit = cleanFun(pattern, out)
    +    }
    +    flatSelect(patternFlatSelectFunction)
    +  }
    +
    +  //  def flatSelect[R: TypeInformation : ClassTag](patternFlatSelectFun: (Map[String, T], Collector[R]) => Unit): DataStream[R] = {
    +  //    val patternFlatSelectFunction: PatternFlatSelectFunction[T, R] = new PatternFlatSelectFunction[T, R] {
    +  //      val cleanFun = cleanClosure(patternFlatSelectFun)
    +  //
    +  //      def flatSelect(pattern: JMap[String, T], out: Collector[R]): Unit = cleanFun(pattern.toMap, out)
    +  //    }
    +  //    flatSelect(patternFlatSelectFunction)
    +  //  }
    +
    +}
    +
    +object PatternStream {
    +  //TODO ask construction
    +  /**
    +    *
    +    * @param jPatternStream Underlying pattern stream from Java API
    +    * @tparam T Type of the events
    +    * @return A new pattern stream wrapping the pattern stream from Java APU
    +    */
    +  def apply[T: TypeInformation : ClassTag](jPatternStream: JPatternStream[T]) = {
    +    new PatternStream[T](jPatternStream)
    +  }
    +}
    --- End diff --
    
    `ClassTags` are not needed for functions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60383704
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala.pattern
    +
    +import org.apache.flink.api.common.functions.FilterFunction
    +import org.apache.flink.cep
    +import org.apache.flink.cep.pattern.{Pattern => JPattern}
    +import org.apache.flink.streaming.api.windowing.time.Time
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * Base class for a pattern definition.
    +  * <p>
    +  * A pattern definition is used by { @link org.apache.flink.cep.nfa.compiler.NFACompiler} to create
    +  * a { @link NFA}.
    +  *
    +  * <pre>{ @code
    +  * Pattern<T, F> pattern = Pattern.<T>begin("start")
    +  * .next("middle").subtype(F.class)
    +  * .followedBy("end").where(new MyFilterFunction());
    +  * }
    +  * </pre>
    +  *
    +  * @param jPattern Underlying Java API Pattern
    +  * @tparam T Base type of the elements appearing in the pattern
    +  * @tparam F Subtype of T to which the current pattern operator is constrained
    +  */
    +class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) {
    +
    +  private[flink] def getWrappedPattern = jPattern
    +
    +
    +  /**
    +    *
    +    * @return Name of the pattern operator
    +    */
    +  def getName: String = jPattern.getName
    +
    +  /**
    +    *
    +    * @return Window length in which the pattern match has to occur
    +    */
    +  def getWindowTime: Option[Time] = {
    +    val time = jPattern.getWindowTime
    +    if (time == null) None else Some(time)
    +  }
    +
    +  /**
    +    *
    +    * @return Filter condition for an event to be matched
    +    */
    +  def getFilterFunction: Option[FilterFunction[F]] = {
    +    val filterFun = jPattern.getFilterFunction
    +    if (filterFun == null) None else Some(filterFun)
    +  }
    +
    +  /**
    +    * Applies a subtype constraint on the current pattern operator. This means that an event has
    +    * to be of the given subtype in order to be matched.
    +    *
    +    * @param clazz Class of the subtype
    +    * @tparam S Type of the subtype
    +    * @return The same pattern operator with the new subtype constraint
    +    */
    +  def subtype[S <: F : ClassTag](clazz: Class[S]): Pattern[T, S] = {
    +    jPattern.subtype(clazz)
    +    this.asInstanceOf[Pattern[T, S]]
    +  }
    +
    +  /**
    +    * Defines the maximum time interval for a matching pattern. This means that the time gap
    +    * between first and the last event must not be longer than the window time.
    +    *
    +    * @param windowTime Time of the matching window
    +    * @return The same pattern operator with the new window length
    +    */
    +  def within(windowTime: Time): Pattern[T, F] = {
    +    jPattern.within(windowTime)
    +    this
    +  }
    +
    +  /**
    +    * Appends a new pattern operator to the existing one. The new pattern operator enforces strict
    +    * temporal contiguity. This means that the whole pattern only matches if an event which matches
    +    * this operator directly follows the preceding matching event. Thus, there cannot be any
    +    * events in between two matching events.
    +    *
    +    * @param name Name of the new pattern operator
    +    * @return A new pattern operator which is appended to this pattern operator
    +    */
    +  def next(name: String): Pattern[T, T] = {
    +    wrapPattern(jPattern.next(name))
    +  }
    +
    +  /**
    +    * Appends a new pattern operator to the existing one. The new pattern operator enforces
    +    * non-strict temporal contiguity. This means that a matching event of this operator and the
    +    * preceding matching event might be interleaved with other events which are ignored.
    +    *
    +    * @param name Name of the new pattern operator
    +    * @return A new pattern operator which is appended to this pattern operator
    +    */
    +  def followedBy(name: String): FollowedByPattern[T, T] = {
    +    FollowedByPattern(jPattern.followedBy(name))
    +  }
    +
    +  /**
    +    * Specifies a filter condition which has to be fulfilled by an event in order to be matched.
    +    *
    +    * @param filter Filter condition
    +    * @return The same pattern operator where the new filter condition is set
    +    */
    +  def where(filter: FilterFunction[F]): Pattern[T, F] = {
    +    jPattern.where(filter)
    +    this
    +  }
    +
    +  /**
    +    * Specifies a filter condition which has to be fulfilled by an event in order to be matched.
    +    *
    +    * @param filterFun Filter condition
    +    * @return The same pattern operator where the new filter condition is set
    +    */
    +  def where(filterFun: F => Boolean): Pattern[T, F] = {
    +    val filter = new FilterFunction[F] {
    +      val cleanFilter = cep.scala.cleanClosure(filterFun)
    +
    +      override def filter(value: F): Boolean = cleanFilter(value)
    +    }
    +    where(filter)
    +  }
    +
    +  //TODO ask about java api change <?> -> <? extends T> and creating a new object vs caching object. equals/hashcode?
    +  /**
    +    *
    +    * @return The previous pattern operator
    +    */
    +  def getPrevious: Option[Pattern[T, _ <: T]] = {
    +    val prev = jPattern.getPrevious
    +    if (prev == null) None else Some(wrapPattern(prev))
    --- End diff --
    
    `Option(jPattern.getPrevious)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60393928
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java ---
    @@ -99,10 +116,26 @@
     			null,
     			false);
     
    +		return flatSelect(patternFlatSelectFunction, outTypeInfo);
    +	}
    +
    +	/**
    +	 * Applies a flat select function to the detected pattern sequence. For each pattern sequence
    +	 * the provided {@link PatternFlatSelectFunction} is called. The pattern flat select function
    +	 * can produce an arbitrary number of resulting elements.
    +	 *
    +	 * @param patternFlatSelectFunction The pattern flat select function which is called for each
    +	 *                                  detected pattern sequence.
    +	 * @param <R> Typ of the resulting elements
    +	 * @param outTypeInfo Explicit specification of output type.
    +	 * @return {@link DataStream} which contains the resulting elements from the pattern flat select
    +	 *         function.
    +	 */
    +	public <R> DataStream<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) {
     		return patternStream.flatMap(
    -			new PatternFlatSelectMapper<T, R>(
    -				patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction)
    -			)).returns(outTypeInfo);
    +				new PatternFlatSelectMapper<T, R>(
    +						patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction)
    +				)).returns(outTypeInfo);
     	}
    --- End diff --
    
    Indentation off in this file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60382954
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala.pattern
    +
    +import org.apache.flink.api.common.functions.FilterFunction
    +import org.apache.flink.cep
    +import org.apache.flink.cep.pattern.{Pattern => JPattern}
    +import org.apache.flink.streaming.api.windowing.time.Time
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * Base class for a pattern definition.
    +  * <p>
    +  * A pattern definition is used by { @link org.apache.flink.cep.nfa.compiler.NFACompiler} to create
    +  * a { @link NFA}.
    +  *
    +  * <pre>{ @code
    +  * Pattern<T, F> pattern = Pattern.<T>begin("start")
    +  * .next("middle").subtype(F.class)
    +  * .followedBy("end").where(new MyFilterFunction());
    +  * }
    +  * </pre>
    +  *
    +  * @param jPattern Underlying Java API Pattern
    +  * @tparam T Base type of the elements appearing in the pattern
    +  * @tparam F Subtype of T to which the current pattern operator is constrained
    +  */
    +class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) {
    +
    +  private[flink] def getWrappedPattern = jPattern
    +
    +
    --- End diff --
    
    two line break


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r61057258
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala ---
    @@ -0,0 +1,178 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala.pattern
    +
    +import org.apache.flink.api.common.functions.FilterFunction
    +import org.apache.flink.cep
    +import org.apache.flink.cep.pattern.{Pattern => JPattern}
    +import org.apache.flink.streaming.api.windowing.time.Time
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * Base class for a pattern definition.
    +  *
    +  * A pattern definition is used by [[org.apache.flink.cep.nfa.compiler.NFACompiler]] to create
    +  * a [[org.apache.flink.cep.nfa.NFA]].
    +  *
    +  * {{{
    +  * Pattern<T, F> pattern = Pattern.<T>begin("start")
    +  * .next("middle").subtype(F.class)
    +  * .followedBy("end").where(new MyFilterFunction());
    +  * }
    +  * }}}
    +  *
    +  * @param jPattern Underlying Java API Pattern
    +  * @tparam T Base type of the elements appearing in the pattern
    +  * @tparam F Subtype of T to which the current pattern operator is constrained
    +  */
    +class Pattern[T: ClassTag, F <: T](jPattern: JPattern[T, F]) {
    +
    +  private[flink] def getWrappedPattern = jPattern
    +
    +  /**
    +    *
    +    * @return Name of the pattern operator
    +    */
    +  def getName: String = jPattern.getName
    --- End diff --
    
    Ok, I truly wasn't aware of that. But it makes sense of course :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1905#issuecomment-211840791
  
    Does anybody know why we needed to add the asm dependency for gelly scala? @tillrohrmann @vasia ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60401939
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala.pattern
    +
    +import org.apache.flink.api.common.functions.FilterFunction
    +import org.apache.flink.cep
    +import org.apache.flink.cep.pattern.{Pattern => JPattern}
    +import org.apache.flink.streaming.api.windowing.time.Time
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * Base class for a pattern definition.
    +  * <p>
    +  * A pattern definition is used by { @link org.apache.flink.cep.nfa.compiler.NFACompiler} to create
    +  * a { @link NFA}.
    +  *
    +  * <pre>{ @code
    +  * Pattern<T, F> pattern = Pattern.<T>begin("start")
    +  * .next("middle").subtype(F.class)
    +  * .followedBy("end").where(new MyFilterFunction());
    +  * }
    +  * </pre>
    +  *
    +  * @param jPattern Underlying Java API Pattern
    +  * @tparam T Base type of the elements appearing in the pattern
    +  * @tparam F Subtype of T to which the current pattern operator is constrained
    +  */
    +class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) {
    +
    +  private[flink] def getWrappedPattern = jPattern
    +
    +
    +  /**
    +    *
    +    * @return Name of the pattern operator
    +    */
    +  def getName: String = jPattern.getName
    +
    +  /**
    +    *
    +    * @return Window length in which the pattern match has to occur
    +    */
    +  def getWindowTime: Option[Time] = {
    +    val time = jPattern.getWindowTime
    +    if (time == null) None else Some(time)
    +  }
    +
    +  /**
    +    *
    +    * @return Filter condition for an event to be matched
    +    */
    +  def getFilterFunction: Option[FilterFunction[F]] = {
    +    val filterFun = jPattern.getFilterFunction
    +    if (filterFun == null) None else Some(filterFun)
    +  }
    +
    +  /**
    +    * Applies a subtype constraint on the current pattern operator. This means that an event has
    +    * to be of the given subtype in order to be matched.
    +    *
    +    * @param clazz Class of the subtype
    +    * @tparam S Type of the subtype
    +    * @return The same pattern operator with the new subtype constraint
    +    */
    +  def subtype[S <: F : ClassTag](clazz: Class[S]): Pattern[T, S] = {
    +    jPattern.subtype(clazz)
    +    this.asInstanceOf[Pattern[T, S]]
    +  }
    +
    +  /**
    +    * Defines the maximum time interval for a matching pattern. This means that the time gap
    +    * between first and the last event must not be longer than the window time.
    +    *
    +    * @param windowTime Time of the matching window
    +    * @return The same pattern operator with the new window length
    +    */
    +  def within(windowTime: Time): Pattern[T, F] = {
    +    jPattern.within(windowTime)
    +    this
    +  }
    +
    +  /**
    +    * Appends a new pattern operator to the existing one. The new pattern operator enforces strict
    +    * temporal contiguity. This means that the whole pattern only matches if an event which matches
    +    * this operator directly follows the preceding matching event. Thus, there cannot be any
    +    * events in between two matching events.
    +    *
    +    * @param name Name of the new pattern operator
    +    * @return A new pattern operator which is appended to this pattern operator
    +    */
    +  def next(name: String): Pattern[T, T] = {
    +    wrapPattern(jPattern.next(name))
    +  }
    +
    +  /**
    +    * Appends a new pattern operator to the existing one. The new pattern operator enforces
    +    * non-strict temporal contiguity. This means that a matching event of this operator and the
    +    * preceding matching event might be interleaved with other events which are ignored.
    +    *
    +    * @param name Name of the new pattern operator
    +    * @return A new pattern operator which is appended to this pattern operator
    +    */
    +  def followedBy(name: String): FollowedByPattern[T, T] = {
    +    FollowedByPattern(jPattern.followedBy(name))
    +  }
    +
    +  /**
    +    * Specifies a filter condition which has to be fulfilled by an event in order to be matched.
    +    *
    +    * @param filter Filter condition
    +    * @return The same pattern operator where the new filter condition is set
    +    */
    +  def where(filter: FilterFunction[F]): Pattern[T, F] = {
    +    jPattern.where(filter)
    +    this
    +  }
    +
    +  /**
    +    * Specifies a filter condition which has to be fulfilled by an event in order to be matched.
    +    *
    +    * @param filterFun Filter condition
    +    * @return The same pattern operator where the new filter condition is set
    +    */
    +  def where(filterFun: F => Boolean): Pattern[T, F] = {
    +    val filter = new FilterFunction[F] {
    +      val cleanFilter = cep.scala.cleanClosure(filterFun)
    +
    +      override def filter(value: F): Boolean = cleanFilter(value)
    +    }
    +    where(filter)
    +  }
    +
    +  //TODO ask about java api change <?> -> <? extends T> and creating a new object vs caching object. equals/hashcode?
    --- End diff --
    
    Change to `? extends T` is good.
    
    What do you mean with creating new object vs caching object?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60404836
  
    --- Diff: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala.pattern
    +
    +import org.apache.flink.cep.pattern.SubtypeFilterFunction
    +import org.apache.flink.cep.scala._
    +import org.junit.Assert._
    +import org.junit.Test
    +
    +class PatternTest {
    --- End diff --
    
    I think that we shouldn't simply copy the Java `PatternTest` here.
    
    It would be better to test that the Scala `Pattern` is correctly represented by the Java `Pattern`, because that representation is used in the end to compile the `NFA`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60542216
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala ---
    @@ -0,0 +1,42 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala
    +
    +import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern, Pattern => JPattern}
    +
    +import _root_.scala.reflect.ClassTag
    +
    +package object pattern {
    +  /**
    +    * Utility method to wrap { @link org.apache.flink.cep.pattern.Pattern} and its subclasses
    +    * for usage with the Scala API.
    +    *
    +    * @param javaPattern The underlying pattern from the Java API
    +    * @tparam T Base type of the elements appearing in the pattern
    +    * @tparam F Subtype of T to which the current pattern operator is constrained
    +    * @return A pattern from the Scala API which wraps the pattern from the Java API
    +    */
    +  private[flink] def wrapPattern[
    +  T: ClassTag, F <: T : ClassTag](javaPattern: JPattern[T, F])
    +  : Pattern[T, F] = javaPattern match {
    +    case f: JFollowedByPattern[T, F] => FollowedByPattern[T, F](f)
    +    case p: JPattern[T, F] => Pattern[T, F](p)
    +    case _ => null
    --- End diff --
    
    Oh yes, you're totally right Stefan. But then we could maybe return an `Option[Pattern]` from the `wrapPattern` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60103959
  
    --- Diff: flink-libraries/flink-cep-scala/pom.xml ---
    @@ -0,0 +1,216 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +    <parent>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-libraries</artifactId>
    +        <version>1.1-SNAPSHOT</version>
    +        <relativePath>..</relativePath>
    +    </parent>
    +    <artifactId>flink-cep-scala_2.10</artifactId>
    +    <name>flink-cep-scala</name>
    +    <packaging>jar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-scala_2.10</artifactId>
    +            <version>${project.version}</version>
    +			<scope>provided</scope>
    +        </dependency>
    +        <!-- We need to add this explicitly because through shading the dependency on asm seems
    +        to go away. -->
    --- End diff --
    
    I initially took the pom.xml from gelly-scala as blueprint. I adopted the line from there; there was not any particular problem for me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60397042
  
    --- Diff: flink-libraries/flink-cep-scala/pom.xml ---
    @@ -0,0 +1,209 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +    <parent>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-libraries</artifactId>
    +        <version>1.1-SNAPSHOT</version>
    +        <relativePath>..</relativePath>
    +    </parent>
    +    <artifactId>flink-cep-scala_2.10</artifactId>
    +    <name>flink-cep-scala</name>
    +    <packaging>jar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-scala_2.10</artifactId>
    +            <version>${project.version}</version>
    +			<scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-clients_2.10</artifactId>
    +            <version>${project.version}</version>
    +			<scope>provided</scope>
    +        </dependency>
    --- End diff --
    
    Not needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60398671
  
    --- Diff: flink-libraries/flink-cep-scala/pom.xml ---
    @@ -0,0 +1,209 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +    <parent>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-libraries</artifactId>
    +        <version>1.1-SNAPSHOT</version>
    +        <relativePath>..</relativePath>
    +    </parent>
    +    <artifactId>flink-cep-scala_2.10</artifactId>
    +    <name>flink-cep-scala</name>
    +    <packaging>jar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-scala_2.10</artifactId>
    +            <version>${project.version}</version>
    +			<scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-clients_2.10</artifactId>
    +            <version>${project.version}</version>
    +			<scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-cep_2.10</artifactId>
    +            <version>${project.version}</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-java_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-scala_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-tests_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +            <type>test-jar</type>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-test-utils_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-java_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +            <type>test-jar</type>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-scala_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +            <type>test-jar</type>
    +        </dependency>
    +    </dependencies>
    +
    +    <build>
    +        <plugins>
    +            <!-- Scala Compiler -->
    +            <plugin>
    +                <groupId>net.alchim31.maven</groupId>
    +                <artifactId>scala-maven-plugin</artifactId>
    +                <version>3.1.4</version>
    +                <executions>
    +                    <!-- Run scala compiler in the process-resources phase, so that dependencies on
    +                        scala classes can be resolved later in the (Java) compile phase -->
    +                    <execution>
    +                        <id>scala-compile-first</id>
    +                        <phase>process-resources</phase>
    +                        <goals>
    +                            <goal>compile</goal>
    +                        </goals>
    +                    </execution>
    +
    +                    <!-- Run scala compiler in the process-test-resources phase, so that dependencies on
    +                         scala classes can be resolved later in the (Java) test-compile phase -->
    +                    <execution>
    +                        <id>scala-test-compile</id>
    +                        <phase>process-test-resources</phase>
    +                        <goals>
    +                            <goal>testCompile</goal>
    +                        </goals>
    +                    </execution>
    +                </executions>
    +                <configuration>
    +                    <jvmArgs>
    +                        <jvmArg>-Xms128m</jvmArg>
    +                        <jvmArg>-Xmx512m</jvmArg>
    +                    </jvmArgs>
    +                    <compilerPlugins combine.children="append">
    +                        <compilerPlugin>
    +                            <groupId>org.scalamacros</groupId>
    +                            <artifactId>paradise_${scala.version}</artifactId>
    +                            <version>${scala.macros.version}</version>
    +                        </compilerPlugin>
    +                    </compilerPlugins>
    +                </configuration>
    +            </plugin>
    +
    +            <!-- Eclipse Integration -->
    +            <plugin>
    +                <groupId>org.apache.maven.plugins</groupId>
    +                <artifactId>maven-eclipse-plugin</artifactId>
    +                <version>2.8</version>
    +                <configuration>
    +                    <downloadSources>true</downloadSources>
    +                    <projectnatures>
    +                        <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
    +                        <projectnature>org.eclipse.jdt.core.javanature</projectnature>
    +                    </projectnatures>
    +                    <buildcommands>
    +                        <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
    +                    </buildcommands>
    +                    <classpathContainers>
    +                        <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
    +                        <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
    +                    </classpathContainers>
    +                    <excludes>
    +                        <exclude>org.scala-lang:scala-library</exclude>
    +                        <exclude>org.scala-lang:scala-compiler</exclude>
    +                    </excludes>
    +                    <sourceIncludes>
    +                        <sourceInclude>**/*.scala</sourceInclude>
    +                        <sourceInclude>**/*.java</sourceInclude>
    +                    </sourceIncludes>
    +                </configuration>
    +            </plugin>
    +
    +            <!-- Adding scala source directories to build path -->
    +            <plugin>
    +                <groupId>org.codehaus.mojo</groupId>
    +                <artifactId>build-helper-maven-plugin</artifactId>
    +                <version>1.7</version>
    +                <executions>
    +                    <!-- Add src/main/scala to eclipse build path -->
    +                    <execution>
    +                        <id>add-source</id>
    +                        <phase>generate-sources</phase>
    +                        <goals>
    +                            <goal>add-source</goal>
    +                        </goals>
    +                        <configuration>
    +                            <sources>
    +                                <source>src/main/scala</source>
    +                            </sources>
    +                        </configuration>
    +                    </execution>
    +                    <!-- Add src/test/scala to eclipse build path -->
    +                    <execution>
    +                        <id>add-test-source</id>
    +                        <phase>generate-test-sources</phase>
    +                        <goals>
    +                            <goal>add-test-source</goal>
    +                        </goals>
    +                        <configuration>
    +                            <sources>
    +                                <source>src/test/scala</source>
    +                            </sources>
    +                        </configuration>
    +                    </execution>
    +                </executions>
    +            </plugin>
    --- End diff --
    
    I don't know how relevant this is anymore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60400970
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/package.scala ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.ClosureCleaner
    +import org.apache.flink.cep.{PatternStream => JPatternStream}
    +
    +import _root_.scala.reflect.ClassTag
    +
    +package object scala {
    +
    +  /**
    +    * Utility method to wrap { @link org.apache.flink.cep.PatternStream}
    +    * for usage with the Scala API.
    +    *
    +    * @param javaPatternStream The underlying pattern stream from the Java API
    +    * @tparam T Type of the events
    +    * @return A pattern stream from the Scala API which wraps a pattern stream from the Java API
    +    */
    +  private[flink] def wrapPatternStream[T: TypeInformation : ClassTag](javaPatternStream: JPatternStream[T])
    +  : scala.PatternStream[T] = {
    +    javaPatternStream match {
    +      case p: JPatternStream[T] => PatternStream[T](p)
    +      case _ => null
    --- End diff --
    
    I think we should throw an `Exception` if we encounter a `null` value here. The reason is that the subsequent operations would throw a `NullPointerException` otherwise. Moreover, there is no good strategy how to continue with a `null` pattern stream.
    
    The most scalaesque way would be:
    
    ```
    Option(javaPatternStream) match {
      case Some(p) => PatternStream[T](p)
      case None => throw new Exception(...)
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60401794
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala.pattern
    +
    +import org.apache.flink.api.common.functions.FilterFunction
    +import org.apache.flink.cep
    +import org.apache.flink.cep.pattern.{Pattern => JPattern}
    +import org.apache.flink.streaming.api.windowing.time.Time
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * Base class for a pattern definition.
    +  * <p>
    +  * A pattern definition is used by { @link org.apache.flink.cep.nfa.compiler.NFACompiler} to create
    +  * a { @link NFA}.
    +  *
    +  * <pre>{ @code
    +  * Pattern<T, F> pattern = Pattern.<T>begin("start")
    +  * .next("middle").subtype(F.class)
    +  * .followedBy("end").where(new MyFilterFunction());
    +  * }
    +  * </pre>
    +  *
    +  * @param jPattern Underlying Java API Pattern
    +  * @tparam T Base type of the elements appearing in the pattern
    +  * @tparam F Subtype of T to which the current pattern operator is constrained
    +  */
    +class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) {
    +
    +  private[flink] def getWrappedPattern = jPattern
    +
    +
    +  /**
    +    *
    +    * @return Name of the pattern operator
    +    */
    +  def getName: String = jPattern.getName
    +
    +  /**
    +    *
    +    * @return Window length in which the pattern match has to occur
    +    */
    +  def getWindowTime: Option[Time] = {
    +    val time = jPattern.getWindowTime
    +    if (time == null) None else Some(time)
    +  }
    +
    +  /**
    +    *
    +    * @return Filter condition for an event to be matched
    +    */
    +  def getFilterFunction: Option[FilterFunction[F]] = {
    +    val filterFun = jPattern.getFilterFunction
    +    if (filterFun == null) None else Some(filterFun)
    +  }
    +
    +  /**
    +    * Applies a subtype constraint on the current pattern operator. This means that an event has
    +    * to be of the given subtype in order to be matched.
    +    *
    +    * @param clazz Class of the subtype
    +    * @tparam S Type of the subtype
    +    * @return The same pattern operator with the new subtype constraint
    +    */
    +  def subtype[S <: F : ClassTag](clazz: Class[S]): Pattern[T, S] = {
    +    jPattern.subtype(clazz)
    +    this.asInstanceOf[Pattern[T, S]]
    +  }
    +
    +  /**
    +    * Defines the maximum time interval for a matching pattern. This means that the time gap
    +    * between first and the last event must not be longer than the window time.
    +    *
    +    * @param windowTime Time of the matching window
    +    * @return The same pattern operator with the new window length
    +    */
    +  def within(windowTime: Time): Pattern[T, F] = {
    +    jPattern.within(windowTime)
    +    this
    +  }
    +
    +  /**
    +    * Appends a new pattern operator to the existing one. The new pattern operator enforces strict
    +    * temporal contiguity. This means that the whole pattern only matches if an event which matches
    +    * this operator directly follows the preceding matching event. Thus, there cannot be any
    +    * events in between two matching events.
    +    *
    +    * @param name Name of the new pattern operator
    +    * @return A new pattern operator which is appended to this pattern operator
    +    */
    +  def next(name: String): Pattern[T, T] = {
    +    wrapPattern(jPattern.next(name))
    +  }
    +
    +  /**
    +    * Appends a new pattern operator to the existing one. The new pattern operator enforces
    +    * non-strict temporal contiguity. This means that a matching event of this operator and the
    +    * preceding matching event might be interleaved with other events which are ignored.
    +    *
    +    * @param name Name of the new pattern operator
    +    * @return A new pattern operator which is appended to this pattern operator
    +    */
    +  def followedBy(name: String): FollowedByPattern[T, T] = {
    +    FollowedByPattern(jPattern.followedBy(name))
    +  }
    +
    +  /**
    +    * Specifies a filter condition which has to be fulfilled by an event in order to be matched.
    +    *
    +    * @param filter Filter condition
    +    * @return The same pattern operator where the new filter condition is set
    +    */
    +  def where(filter: FilterFunction[F]): Pattern[T, F] = {
    +    jPattern.where(filter)
    +    this
    +  }
    +
    +  /**
    +    * Specifies a filter condition which has to be fulfilled by an event in order to be matched.
    +    *
    +    * @param filterFun Filter condition
    +    * @return The same pattern operator where the new filter condition is set
    +    */
    +  def where(filterFun: F => Boolean): Pattern[T, F] = {
    +    val filter = new FilterFunction[F] {
    +      val cleanFilter = cep.scala.cleanClosure(filterFun)
    +
    +      override def filter(value: F): Boolean = cleanFilter(value)
    +    }
    +    where(filter)
    +  }
    +
    +  //TODO ask about java api change <?> -> <? extends T> and creating a new object vs caching object. equals/hashcode?
    +  /**
    +    *
    +    * @return The previous pattern operator
    +    */
    +  def getPrevious: Option[Pattern[T, _ <: T]] = {
    +    val prev = jPattern.getPrevious
    +    if (prev == null) None else Some(wrapPattern(prev))
    +
    +  }
    +
    +}
    +
    +object Pattern {
    +
    +  /**
    +    * Constructs a new Pattern by wrapping a given Java API Pattern
    +    *
    +    * @param jPattern Underlying Java API Pattern.
    +    * @tparam T Base type of the elements appearing in the pattern
    +    * @tparam F Subtype of T to which the current pattern operator is constrained
    +    * @return New wrapping Pattern object
    +    */
    +  def apply[T: ClassTag, F <: T : ClassTag]
    +  (jPattern: JPattern[T, F]) = new Pattern[T, F](jPattern)
    +
    +  /**
    +    * Starts a new pattern with the initial pattern operator whose name is provided. Furthermore,
    +    * the base type of the event sequence is set.
    +    *
    +    * @param name Name of the new pattern operator
    +    * @tparam X Base type of the event pattern
    +    * @return The first pattern operator of a pattern
    +    */
    +  def begin[X: ClassTag]
    --- End diff --
    
    Same here. Formatting is off.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60403183
  
    --- Diff: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/StreamEvent.scala ---
    @@ -0,0 +1,28 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala
    +
    +object StreamEvent {
    +  def of[V](event: V, timestamp: Long): StreamEvent[V] = {
    +    new StreamEvent[V](event, timestamp)
    +  }
    +}
    +
    +class StreamEvent[T](val event: T, val timestamp: Long) {
    --- End diff --
    
    We could reuse the `StreamEvent` class from `flink-cep`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60931810
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala ---
    @@ -0,0 +1,178 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala.pattern
    +
    +import org.apache.flink.api.common.functions.FilterFunction
    +import org.apache.flink.cep
    +import org.apache.flink.cep.pattern.{Pattern => JPattern}
    +import org.apache.flink.streaming.api.windowing.time.Time
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * Base class for a pattern definition.
    +  *
    +  * A pattern definition is used by [[org.apache.flink.cep.nfa.compiler.NFACompiler]] to create
    +  * a [[org.apache.flink.cep.nfa.NFA]].
    +  *
    +  * {{{
    +  * Pattern<T, F> pattern = Pattern.<T>begin("start")
    +  * .next("middle").subtype(F.class)
    +  * .followedBy("end").where(new MyFilterFunction());
    +  * }
    +  * }}}
    +  *
    +  * @param jPattern Underlying Java API Pattern
    +  * @tparam T Base type of the elements appearing in the pattern
    +  * @tparam F Subtype of T to which the current pattern operator is constrained
    +  */
    +class Pattern[T: ClassTag, F <: T](jPattern: JPattern[T, F]) {
    +
    +  private[flink] def getWrappedPattern = jPattern
    +
    +  /**
    +    *
    +    * @return Name of the pattern operator
    +    */
    +  def getName: String = jPattern.getName
    --- End diff --
    
    In the Scala world one does not write Java bean like getters. Instead one would simply write `def name = jPattern.getName()`. Note that whenever we call java functions from Scala we add parenthesis even though they might not be needed. This is to underline that one is calling a Java function which does not have the notion of purity.
    
    If we really wanna make the API identical, meaning that we offer functions like `getName`, `getWindowTIme` in the Scala API as well, then we should add parenthesis to their definitions to make clear that this is purposefully a Java bean like getter definition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60102211
  
    --- Diff: flink-libraries/flink-cep-scala/pom.xml ---
    @@ -0,0 +1,216 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +    <parent>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-libraries</artifactId>
    +        <version>1.1-SNAPSHOT</version>
    +        <relativePath>..</relativePath>
    +    </parent>
    +    <artifactId>flink-cep-scala_2.10</artifactId>
    +    <name>flink-cep-scala</name>
    +    <packaging>jar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-scala_2.10</artifactId>
    +            <version>${project.version}</version>
    +			<scope>provided</scope>
    +        </dependency>
    +        <!-- We need to add this explicitly because through shading the dependency on asm seems
    +        to go away. -->
    --- End diff --
    
    What exactly was the problem here? Ideally the module inherits the shading plugin from the parent and everything is shaded correctly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1905#issuecomment-212429457
  
    Thanks for your good contribution @StefanRRichter. I had some inline comments.
    
    It would be good to revisit the link tags in the ScalaDocs and replace them with the ScalaDoc link syntax. I haven't marked all occurrences in the code. 
    
    Furthermore, it should be possible to completely throw out the `ClassTag` context bound. I have only marked the first occurrences where I've noticed it.
    
    The Scala code contained several lines which exceeded the maximum line length of 100 characters. The Scala style check plugin, which detects these style violations, is executed when you run `mvn verify`. Thus, it is always a good idea to run `mvn verify` once locally before pushing commits to a PR.
    
    Ping me once you've addressed my comments. Then I'll make sure that the PR is merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60394471
  
    --- Diff: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/StreamEvent.scala ---
    @@ -0,0 +1,28 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala
    +
    +object StreamEvent {
    +  def of[V](event: V, timestamp: Long): StreamEvent[V] = {
    +    new StreamEvent[V](event, timestamp)
    +  }
    +}
    +
    +class StreamEvent[T](val event: T, val timestamp: Long) {
    +
    --- End diff --
    
    line breaks unnecessary


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60383097
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala.pattern
    +
    +import org.apache.flink.api.common.functions.FilterFunction
    +import org.apache.flink.cep
    +import org.apache.flink.cep.pattern.{Pattern => JPattern}
    +import org.apache.flink.streaming.api.windowing.time.Time
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * Base class for a pattern definition.
    +  * <p>
    +  * A pattern definition is used by { @link org.apache.flink.cep.nfa.compiler.NFACompiler} to create
    +  * a { @link NFA}.
    +  *
    +  * <pre>{ @code
    +  * Pattern<T, F> pattern = Pattern.<T>begin("start")
    +  * .next("middle").subtype(F.class)
    +  * .followedBy("end").where(new MyFilterFunction());
    +  * }
    +  * </pre>
    +  *
    +  * @param jPattern Underlying Java API Pattern
    +  * @tparam T Base type of the elements appearing in the pattern
    +  * @tparam F Subtype of T to which the current pattern operator is constrained
    +  */
    +class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) {
    +
    +  private[flink] def getWrappedPattern = jPattern
    +
    +
    +  /**
    +    *
    +    * @return Name of the pattern operator
    +    */
    +  def getName: String = jPattern.getName
    +
    +  /**
    +    *
    +    * @return Window length in which the pattern match has to occur
    +    */
    +  def getWindowTime: Option[Time] = {
    +    val time = jPattern.getWindowTime
    +    if (time == null) None else Some(time)
    --- End diff --
    
    It's easier to simply write `Option(jPattern.getWindowTime())`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60402984
  
    --- Diff: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPITCase.scala ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala
    +
    +import java.util.{Map => JMap}
    +
    +import org.apache.flink.api.java.tuple.Tuple2
    +import org.apache.flink.cep.scala.pattern.Pattern
    +import org.apache.flink.core.fs.FileSystem
    +import org.apache.flink.streaming.api.TimeCharacteristic
    +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
    +import org.apache.flink.streaming.api.scala._
    +import org.apache.flink.streaming.api.watermark.Watermark
    +import org.apache.flink.streaming.api.windowing.time.Time
    +import org.apache.flink.test.util.TestBaseUtils
    +import org.junit.{After, Before, Rule, Test}
    +import org.junit.rules.TemporaryFolder
    +
    +
    +@SuppressWarnings(Array("serial")) class CEPITCase extends ScalaStreamingMultipleProgramsTestBase {
    +  private var resultPath: String = null
    +  private var expected: String = null
    +  val _tempFolder = new TemporaryFolder
    +
    +  @Rule
    +  def tempFolder: TemporaryFolder = _tempFolder
    +
    +  @Before
    +  @throws[Exception]
    +  def before {
    +    resultPath = tempFolder.newFile.toURI.toString
    +    expected = ""
    +  }
    +
    +  @After
    +  @throws[Exception]
    +  def after {
    +    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
    +  }
    +
    +  @Test
    +  @throws[Exception]
    +  def testSimplePatternCEP {
    +    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    +    val input: DataStream[Event] = env.fromElements(
    +      new Event(1, "barfoo", 1.0),
    +      new Event(2, "start", 2.0),
    +      new Event(3, "foobar", 3.0),
    +      new SubEvent(4, "foo", 4.0, 1.0),
    +      new Event(5, "middle", 5.0),
    +      new SubEvent(6, "middle", 6.0, 2.0),
    +      new SubEvent(7, "bar", 3.0, 3.0),
    +      new Event(42, "42", 42.0),
    +      new Event(8, "end", 1.0))
    +    val pattern: Pattern[Event, _] = Pattern.begin[Event]("start")
    +      .where((value: Event) => value.name == "start")
    +      .followedBy("middle")
    +      .subtype(classOf[SubEvent])
    +      .where((value: SubEvent) => value.name == "middle")
    +      .followedBy("end")
    +      .where((value: Event) => value.name == "end")
    +    val result: DataStream[String] = CEP.pattern(input, pattern)
    +      .select((pattern: JMap[String, Event]) => {
    +        val builder: StringBuilder = new StringBuilder
    +        builder.append(pattern.get("start").id)
    +          .append(",")
    +          .append(pattern.get("middle").id)
    +          .append(",")
    +          .append(pattern.get("end").id)
    +          .toString
    +      })
    +    result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE)
    +    expected = "2,6,8"
    +    env.execute
    +  }
    +
    +  @Test
    +  @throws[Exception]
    +  def testSimpleKeyedPatternCEP {
    +    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setParallelism(2)
    +    val input: DataStream[Event] = env.fromElements(
    +      new Event(1, "barfoo", 1.0),
    +      new Event(2, "start", 2.0),
    +      new Event(3, "start", 2.1),
    +      new Event(3, "foobar", 3.0),
    +      new SubEvent(4, "foo", 4.0, 1.0),
    +      new SubEvent(3, "middle", 3.2, 1.0),
    +      new Event(42, "start", 3.1),
    +      new SubEvent(42, "middle", 3.3, 1.2),
    +      new Event(5, "middle", 5.0),
    +      new SubEvent(2, "middle", 6.0, 2.0),
    +      new SubEvent(7, "bar", 3.0, 3.0),
    +      new Event(42, "42", 42.0),
    +      new Event(3, "end", 2.0),
    +      new Event(2, "end", 1.0),
    +      new Event(42, "end", 42.0))
    +      .keyBy((value: Event) => value.id)
    +    val pattern: Pattern[Event, _] = Pattern.begin[Event]("start")
    +      .where((value: Event) => value.name == "start")
    +      .followedBy("middle")
    +      .subtype(classOf[SubEvent])
    +      .where((value: SubEvent) => value.name == "middle")
    +      .followedBy("end")
    +      .where((value: Event) => value.name == "end")
    +    val result: DataStream[String] = CEP.pattern(input, pattern).select((pattern: JMap[String, Event]) => {
    +      val builder: StringBuilder = new StringBuilder
    +      builder
    +        .append(pattern.get("start").id)
    +        .append(",")
    +        .append(pattern.get("middle").id)
    +        .append(",")
    +        .append(pattern.get("end").id)
    +        .toString
    +    })
    +    result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE)
    +    expected = "2,2,2\n3,3,3\n42,42,42"
    +    env.execute
    +  }
    +
    +  @Test
    +  @throws[Exception]
    +  def testSimplePatternEventTime {
    +    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +    val input: DataStream[Event] = env.fromElements(
    +      Tuple2.of(new Event(1, "start", 1.0), 5L),
    +      Tuple2.of(new Event(2, "middle", 2.0), 1L),
    +      Tuple2.of(new Event(3, "end", 3.0), 3L),
    +      Tuple2.of(new Event(4, "end", 4.0), 10L),
    +      Tuple2.of(new Event(5, "middle", 5.0), 7L),
    +      Tuple2.of(new Event(5, "middle", 5.0), 100L))
    +      .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[Tuple2[Event, Long]] {
    +        def extractTimestamp(element: Tuple2[Event, Long], previousTimestamp: Long): Long = {
    +          element.f1
    +        }
    +
    +        def checkAndGetNextWatermark(lastElement: Tuple2[Event, Long], extractedTimestamp: Long): Watermark = {
    +          new Watermark(lastElement.f1 - 5)
    +        }
    +      }).map((value: Tuple2[Event, Long]) => value.f0)
    +
    +    val pattern: Pattern[Event, _] = Pattern.begin[Event]("start")
    +      .where((value: Event) => value.name == "start")
    +      .followedBy("middle")
    +      .where((value: Event) => value.name == "middle")
    +      .followedBy("end")
    +      .where((value: Event) => value.name == "end")
    +
    +    val result: DataStream[String] = CEP.pattern(input, pattern)
    +      .select((pattern: JMap[String, Event]) => {
    +        val builder: StringBuilder = new StringBuilder
    +        builder
    +          .append(pattern.get("start").id)
    +          .append(",")
    +          .append(pattern.get("middle").id)
    +          .append(",")
    +          .append(pattern.get("end").id)
    +          .toString
    +      })
    +    result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE)
    +    expected = "1,5,4"
    +    env.execute
    +  }
    +
    +  @Test
    +  @throws[Exception]
    +  def testSimpleKeyedPatternEventTime {
    +    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +    env.setParallelism(2)
    +    val input: DataStream[Event] = env.fromElements(
    +      Tuple2.of(new Event(1, "start", 1.0), 5L),
    +      Tuple2.of(new Event(1, "middle", 2.0), 1L),
    +      Tuple2.of(new Event(2, "middle", 2.0), 4L),
    +      Tuple2.of(new Event(2, "start", 2.0), 3L),
    +      Tuple2.of(new Event(1, "end", 3.0), 3L),
    +      Tuple2.of(new Event(3, "start", 4.1), 5L),
    +      Tuple2.of(new Event(1, "end", 4.0), 10L),
    +      Tuple2.of(new Event(2, "end", 2.0), 8L),
    +      Tuple2.of(new Event(1, "middle", 5.0), 7L),
    +      Tuple2.of(new Event(3, "middle", 6.0), 9L),
    +      Tuple2.of(new Event(3, "end", 7.0), 7L))
    +      .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[Tuple2[Event, Long]] {
    +        def extractTimestamp(element: Tuple2[Event, Long], currentTimestamp: Long): Long = {
    +          element.f1
    +        }
    +
    +        def checkAndGetNextWatermark(lastElement: Tuple2[Event, Long], extractedTimestamp: Long): Watermark = {
    +          new Watermark(lastElement.f1 - 5)
    +        }
    +      }).map((value: Tuple2[Event, Long]) => value.f0)
    +      .keyBy((value: Event) => value.id)
    +    val pattern: Pattern[Event, _] = Pattern.begin[Event]("start")
    +      .where((value: Event) => value.name == "start")
    +      .followedBy("middle")
    +      .where((value: Event) => value.name == "middle")
    +      .followedBy("end")
    +      .where((value: Event) => value.name == "end")
    +    val result: DataStream[String] = CEP.pattern(input, pattern).select((pattern: JMap[String, Event]) => {
    +      val builder: StringBuilder = new StringBuilder
    +      builder
    +        .append(pattern.get("start").id)
    +        .append(",")
    +        .append(pattern.get("middle").id)
    +        .append(",")
    +        .append(pattern.get("end").id)
    +        .toString
    +    })
    +    result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE)
    +    expected = "1,1,1\n2,2,2"
    +    env.execute
    +  }
    +
    +
    +  @Test
    +  @throws[Exception]
    +  def testSimplePatternWithSingleState {
    +    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    +    val input: DataStream[Tuple2[Int, Int]] = env.fromElements(new Tuple2[Int, Int](0, 1), new Tuple2[Int, Int](0, 2))
    --- End diff --
    
    We should test Scala tuples here and not Flink's Java tuples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60399173
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala ---
    @@ -0,0 +1,46 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.cep.scala.pattern.Pattern
    +import org.apache.flink.cep.{CEP => JCEP}
    +import org.apache.flink.streaming.api.scala.DataStream
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * Utility method to transform a { @link DataStream} into a { @link PatternStream} to do CEP.
    +  */
    +
    +object CEP {
    +  /**
    +    * Transforms a { @link DataStream[T]} into a { @link PatternStream[T]} in the Scala API.
    +    * See { @link org.apache.flink.cep.CEP} for a more detailed description how the underlying
    +    * Java API works.
    --- End diff --
    
    links in comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60394562
  
    --- Diff: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/StreamEvent.scala ---
    @@ -0,0 +1,28 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala
    +
    +object StreamEvent {
    +  def of[V](event: V, timestamp: Long): StreamEvent[V] = {
    +    new StreamEvent[V](event, timestamp)
    +  }
    +}
    +
    +class StreamEvent[T](val event: T, val timestamp: Long) {
    --- End diff --
    
    `StreamEvent` could be defined as a case class. Then you don't have to define the factory methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60400262
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala ---
    @@ -0,0 +1,142 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala
    +
    +import java.util.{Map => JMap}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction, PatternStream => JPatternStream}
    +import org.apache.flink.streaming.api.scala._
    +import org.apache.flink.util.Collector
    +import scala.reflect.ClassTag
    +
    +/**
    +  * Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits detected
    +  * pattern sequences as a map of events associated with their names. The pattern is detected using a
    +  * { @link org.apache.flink.cep.nfa.NFA}. In order to process the detected sequences, the user
    +  * has to specify a { @link PatternSelectFunction} or a { @link PatternFlatSelectFunction}.
    +  *
    +  * @param jPatternStream Underlying pattern stream from Java API
    +  * @tparam T Type of the events
    +  */
    +class PatternStream[T: TypeInformation : ClassTag](jPatternStream: JPatternStream[T]) {
    +
    +  private[flink] def getWrappedPatternStream = jPatternStream
    +
    +  /**
    +    * Applies a select function to the detected pattern sequence. For each pattern sequence the
    +    * provided { @link PatternSelectFunction} is called. The pattern select function can produce
    +    * exactly one resulting element.
    +    *
    +    * @param patternSelectFunction The pattern select function which is called for each detected
    +    *                              pattern sequence.
    +    * @tparam R Type of the resulting elements
    +    * @return { @link DataStream} which contains the resulting elements from the pattern select
    +    *         unction.
    +    */
    +  def select[R: TypeInformation : ClassTag](patternSelectFunction: PatternSelectFunction[T, R]): DataStream[R] = {
    +    asScalaStream(jPatternStream.select(patternSelectFunction, implicitly[TypeInformation[R]]))
    +  }
    +
    +  /**
    +    * Applies a flat select function to the detected pattern sequence. For each pattern sequence
    +    * the provided { @link PatternFlatSelectFunction} is called. The pattern flat select function
    +    * can produce an arbitrary number of resulting elements.
    +    *
    +    * @param patternFlatSelectFunction The pattern flat select function which is called for each
    +    *                                  detected pattern sequence.
    +    * @tparam R Type of the resulting elements
    +    * @return { @link DataStream} which contains the resulting elements from the pattern flat select
    +    *         function.
    +    */
    +  def flatSelect[R: TypeInformation : ClassTag](patternFlatSelectFunction: PatternFlatSelectFunction[T, R]): DataStream[R] = {
    +    asScalaStream(jPatternStream.flatSelect(patternFlatSelectFunction, implicitly[TypeInformation[R]]))
    +  }
    +
    +  /**
    +    * Applies a select function to the detected pattern sequence. For each pattern sequence the
    +    * provided { @link PatternSelectFunction} is called. The pattern select function can produce
    +    * exactly one resulting element.
    +    *
    +    * @param patternSelectFun The pattern select function which is called for each detected
    +    *                         pattern sequence.
    +    * @tparam R Type of the resulting elements
    +    * @return { @link DataStream} which contains the resulting elements from the pattern select
    +    *         function.
    +    */
    +  def select[R: TypeInformation : ClassTag](patternSelectFun: JMap[String, T] => R): DataStream[R] = {
    --- End diff --
    
    I think it would be good to not give a Java `Map` but a Scala `Map` to the `patternSelectFun` as you've done in the out commented code.
    
    I think it would be good to use an explicit conversion via the `JavaConverters` and the `asScala` method.
    
    Semantically, it would be correct to provide a immutable map to the Scala lambda. However, this would mean that we copy the map. Therefore, for the sake of efficiency, we could simply pass in the result of the `asScala` call which is a mutable `Map`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60923941
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala ---
    @@ -0,0 +1,178 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala.pattern
    +
    +import org.apache.flink.api.common.functions.FilterFunction
    +import org.apache.flink.cep
    +import org.apache.flink.cep.pattern.{Pattern => JPattern}
    +import org.apache.flink.streaming.api.windowing.time.Time
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * Base class for a pattern definition.
    +  *
    +  * A pattern definition is used by [[org.apache.flink.cep.nfa.compiler.NFACompiler]] to create
    +  * a [[org.apache.flink.cep.nfa.NFA]].
    +  *
    +  * {{{
    +  * Pattern<T, F> pattern = Pattern.<T>begin("start")
    +  * .next("middle").subtype(F.class)
    +  * .followedBy("end").where(new MyFilterFunction());
    +  * }
    +  * }}}
    +  *
    +  * @param jPattern Underlying Java API Pattern
    +  * @tparam T Base type of the elements appearing in the pattern
    +  * @tparam F Subtype of T to which the current pattern operator is constrained
    +  */
    +class Pattern[T: ClassTag, F <: T](jPattern: JPattern[T, F]) {
    --- End diff --
    
    For what do we need the `ClassTag` here? I think it shouldn't be necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60924299
  
    --- Diff: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabiliyTest.scala ---
    @@ -0,0 +1,88 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala
    +
    +import org.apache.flink.api.common.functions.util.ListCollector
    +import org.apache.flink.cep.scala.pattern.Pattern
    +import org.apache.flink.streaming.api.operators.{StreamFlatMap, StreamMap}
    +import org.apache.flink.streaming.api.scala._
    +import org.apache.flink.streaming.api.transformations.OneInputTransformation
    +import org.apache.flink.util.{Collector, TestLogger}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +import org.junit.Assert._
    +import org.junit.Test
    +
    +class PatternStreamScalaJavaAPIInteroperabiliyTest extends TestLogger {
    +
    +
    --- End diff --
    
    two line breaks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1905#issuecomment-214806804
  
    Failing test case is unrelated. Really good work @StefanRRichter. Will merge it. 
    
    As a follow up, we should update the CEP documentation to also include Scala code examples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60925320
  
    --- Diff: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabiliyTest.scala ---
    @@ -0,0 +1,88 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala
    +
    +import org.apache.flink.api.common.functions.util.ListCollector
    +import org.apache.flink.cep.scala.pattern.Pattern
    +import org.apache.flink.streaming.api.operators.{StreamFlatMap, StreamMap}
    +import org.apache.flink.streaming.api.scala._
    +import org.apache.flink.streaming.api.transformations.OneInputTransformation
    +import org.apache.flink.util.{Collector, TestLogger}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +import org.junit.Assert._
    +import org.junit.Test
    +
    +class PatternStreamScalaJavaAPIInteroperabiliyTest extends TestLogger {
    +
    +
    +  @Test
    +  @throws[Exception]
    +  def testScalaJavaAPISelectFunForwarding {
    +    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    +    val dummyDataStream: DataStream[(Int, Int)] = env.fromElements()
    +    val pattern: Pattern[(Int, Int), _] = Pattern.begin[(Int, Int)]("dummy")
    +    val pStream: PatternStream[(Int, Int)] = CEP.pattern(dummyDataStream, pattern)
    +    val param = mutable.Map("begin" ->(1, 2)).asJava
    +    val result: DataStream[(Int, Int)] = pStream
    +      .select((pattern: mutable.Map[String, (Int, Int)]) => {
    +        //verifies input parameter forwarding
    +        assertEquals(param, pattern.asJava)
    +        param.get("begin")
    +      })
    +    val out = extractUserFunction[StreamMap[java.util.Map[String, (Int, Int)], (Int, Int)]](result)
    +      .getUserFunction.map(param)
    +    //verifies output parameter forwarding
    +    assertEquals(param.get("begin"), out)
    +  }
    +
    +  @Test
    +  @throws[Exception]
    +  def testScalaJavaAPIFlatSelectFunForwarding {
    +    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    +    val dummyDataStream: DataStream[List[Int]] = env.fromElements()
    +    val pattern: Pattern[List[Int], _] = Pattern.begin[List[Int]]("dummy")
    +    val pStream: PatternStream[List[Int]] = CEP.pattern(dummyDataStream, pattern)
    +    val inList = List(1, 2, 3)
    +    val inParam = mutable.Map("begin" -> inList).asJava
    +    val outList = new java.util.ArrayList[List[Int]]
    +    val outParam = new ListCollector[List[Int]](outList)
    +
    +    val result: DataStream[List[Int]] = pStream
    +
    +      .flatSelect((pattern: mutable.Map[String, List[Int]], out: Collector[List[Int]]) => {
    +        //verifies input parameter forwarding
    +        assertEquals(inParam, pattern.asJava)
    +        out.collect(pattern.get("begin").get)
    +      })
    +
    +    extractUserFunction[StreamFlatMap[java.util.Map[String, List[Int]], List[Int]]](result).
    +      getUserFunction.flatMap(inParam, outParam)
    +    //verify output parameter forwarding and that flatMap function was actually called
    +    assertEquals(inList, outList.get(0))
    +  }
    +
    +  def extractUserFunction[T](dataStream: DataStream[_]) = {
    +    dataStream.javaStream
    +      .getTransformation
    +      .asInstanceOf[OneInputTransformation[_, _]]
    +      .getOperator
    +      .asInstanceOf[T]
    +  }
    --- End diff --
    
    Good tests :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60403078
  
    --- Diff: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaAPICompletenessTest.scala ---
    @@ -0,0 +1,44 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala
    +
    +import java.lang.reflect.Method
    +
    +import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase
    +import org.apache.flink.cep.{PatternStream => JPatternStream}
    +import org.junit.Test
    +
    +import scala.language.existentials
    +
    +/**
    + * This checks whether the CEP Scala API is up to feature parity with the Java API.
    + * Implements the {@link ScalaAPICompletenessTest} for CEP.
    + */
    +class CEPScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
    --- End diff --
    
    Good one :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60399038
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala ---
    @@ -0,0 +1,46 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.cep.scala.pattern.Pattern
    +import org.apache.flink.cep.{CEP => JCEP}
    +import org.apache.flink.streaming.api.scala.DataStream
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * Utility method to transform a { @link DataStream} into a { @link PatternStream} to do CEP.
    --- End diff --
    
    In ScalaDocs you comment using the syntax `[[DataStream]]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60407197
  
    --- Diff: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPITCase.scala ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala
    +
    +import java.util.{Map => JMap}
    +
    +import org.apache.flink.api.java.tuple.Tuple2
    +import org.apache.flink.cep.scala.pattern.Pattern
    +import org.apache.flink.core.fs.FileSystem
    +import org.apache.flink.streaming.api.TimeCharacteristic
    +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
    +import org.apache.flink.streaming.api.scala._
    +import org.apache.flink.streaming.api.watermark.Watermark
    +import org.apache.flink.streaming.api.windowing.time.Time
    +import org.apache.flink.test.util.TestBaseUtils
    +import org.junit.{After, Before, Rule, Test}
    +import org.junit.rules.TemporaryFolder
    +
    +
    +@SuppressWarnings(Array("serial")) class CEPITCase extends ScalaStreamingMultipleProgramsTestBase {
    --- End diff --
    
    I think we should not simply copy the Java `CEPITCase` here because it will test to a large extend what is already tested by the Java `CEPITCase`.
    
    What's more interesting to test is whether the Scala CEP can work with pure Scala types, such as a Scala tuple or a Scala collection. 
    
    Furthermore, it would be interesting to see whether the `select` call is correctly forwarded to the Scala function. It should not be strictly necessary to write an IT case for that. You should be able to extract the mapper function from the `Transformation` and then simply pass a test value into it. If you see it in the Scala function, then the forwarding should be correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60104038
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala ---
    @@ -0,0 +1,34 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala.pattern
    +
    +import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern, Pattern => JPattern}
    +
    +import scala.reflect.ClassTag
    +
    +
    +object FollowedByPattern {
    +  def apply[T : ClassTag, F <: T : ClassTag]
    +  (jfbPattern: JFollowedByPattern[T, F]) = new FollowedByPattern[T, F](jfbPattern)
    +}
    +
    +/**
    +  * Created by stefan on 16.04.16.
    --- End diff --
    
    Sure. Thought I already removed all of it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60102619
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala ---
    @@ -0,0 +1,34 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala.pattern
    +
    +import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern, Pattern => JPattern}
    +
    +import scala.reflect.ClassTag
    +
    +
    +object FollowedByPattern {
    +  def apply[T : ClassTag, F <: T : ClassTag]
    +  (jfbPattern: JFollowedByPattern[T, F]) = new FollowedByPattern[T, F](jfbPattern)
    +}
    +
    +/**
    +  * Created by stefan on 16.04.16.
    --- End diff --
    
    Can you remove this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1905#issuecomment-211843938
  
    It looks like it was added as part of [FLINK-3136](https://issues.apache.org/jira/browse/FLINK-3136).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60397035
  
    --- Diff: flink-libraries/flink-cep-scala/pom.xml ---
    @@ -0,0 +1,209 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +    <parent>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-libraries</artifactId>
    +        <version>1.1-SNAPSHOT</version>
    +        <relativePath>..</relativePath>
    +    </parent>
    +    <artifactId>flink-cep-scala_2.10</artifactId>
    +    <name>flink-cep-scala</name>
    +    <packaging>jar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-scala_2.10</artifactId>
    +            <version>${project.version}</version>
    +			<scope>provided</scope>
    +        </dependency>
    --- End diff --
    
    Not needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60402681
  
    --- Diff: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPITCase.scala ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala
    +
    +import java.util.{Map => JMap}
    +
    +import org.apache.flink.api.java.tuple.Tuple2
    +import org.apache.flink.cep.scala.pattern.Pattern
    +import org.apache.flink.core.fs.FileSystem
    +import org.apache.flink.streaming.api.TimeCharacteristic
    +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
    +import org.apache.flink.streaming.api.scala._
    +import org.apache.flink.streaming.api.watermark.Watermark
    +import org.apache.flink.streaming.api.windowing.time.Time
    +import org.apache.flink.test.util.TestBaseUtils
    +import org.junit.{After, Before, Rule, Test}
    +import org.junit.rules.TemporaryFolder
    +
    +
    +@SuppressWarnings(Array("serial")) class CEPITCase extends ScalaStreamingMultipleProgramsTestBase {
    --- End diff --
    
    Just my personal preference: I would put the annotation in an extra line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1905#issuecomment-214391606
  
    Good work @StefanRRichter. I had only some minor comments left. Once these are addressed, the PR should be ready to be merged :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60394116
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java ---
    @@ -61,19 +61,36 @@
     	public <R> DataStream<R> select(final PatternSelectFunction<T, R> patternSelectFunction) {
     		// we have to extract the output type from the provided pattern selection function manually
     		// because the TypeExtractor cannot do that if the method is wrapped in a MapFunction
    -		TypeInformation<R> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
    -			patternSelectFunction,
    -			PatternSelectFunction.class,
    -			1,
    -			-1,
    -			inputType,
    -			null,
    -			false);
     
    +		TypeInformation<R> returnType = TypeExtractor.getUnaryOperatorReturnType(
    +					patternSelectFunction,
    +					PatternSelectFunction.class,
    +					1,
    +					-1,
    +					inputType,
    +					null,
    +					false);
    --- End diff --
    
    Indentation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60402439
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala ---
    @@ -0,0 +1,42 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala
    +
    +import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern, Pattern => JPattern}
    +
    +import _root_.scala.reflect.ClassTag
    +
    +package object pattern {
    +  /**
    +    * Utility method to wrap { @link org.apache.flink.cep.pattern.Pattern} and its subclasses
    +    * for usage with the Scala API.
    +    *
    +    * @param javaPattern The underlying pattern from the Java API
    +    * @tparam T Base type of the elements appearing in the pattern
    +    * @tparam F Subtype of T to which the current pattern operator is constrained
    +    * @return A pattern from the Scala API which wraps the pattern from the Java API
    +    */
    +  private[flink] def wrapPattern[
    +  T: ClassTag, F <: T : ClassTag](javaPattern: JPattern[T, F])
    +  : Pattern[T, F] = javaPattern match {
    +    case f: JFollowedByPattern[T, F] => FollowedByPattern[T, F](f)
    +    case p: JPattern[T, F] => Pattern[T, F](p)
    +    case _ => null
    --- End diff --
    
    `null` case should thrown an exception, because we cannot continue from here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60397159
  
    --- Diff: flink-libraries/flink-cep-scala/pom.xml ---
    @@ -0,0 +1,209 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +    <parent>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-libraries</artifactId>
    +        <version>1.1-SNAPSHOT</version>
    +        <relativePath>..</relativePath>
    +    </parent>
    +    <artifactId>flink-cep-scala_2.10</artifactId>
    +    <name>flink-cep-scala</name>
    +    <packaging>jar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-scala_2.10</artifactId>
    +            <version>${project.version}</version>
    +			<scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-clients_2.10</artifactId>
    +            <version>${project.version}</version>
    +			<scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-cep_2.10</artifactId>
    +            <version>${project.version}</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-java_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-scala_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-tests_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +            <type>test-jar</type>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-test-utils_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-java_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +            <type>test-jar</type>
    +        </dependency>
    --- End diff --
    
    Not needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60945888
  
    --- Diff: flink-libraries/flink-cep-scala/pom.xml ---
    @@ -0,0 +1,122 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +    <parent>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-libraries</artifactId>
    +        <version>1.1-SNAPSHOT</version>
    +        <relativePath>..</relativePath>
    +    </parent>
    +    <artifactId>flink-cep-scala_2.10</artifactId>
    +    <name>flink-cep-scala</name>
    +    <packaging>jar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-cep_2.10</artifactId>
    +            <version>${project.version}</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-scala_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>provided</scope>
    +        </dependency>
    +        <!-- We need to add this explicitly because through shading the dependency on asm seems
    +        to go away. TODO -->
    +        <dependency>
    +            <groupId>org.ow2.asm</groupId>
    +            <artifactId>asm</artifactId>
    +            <version>${asm.version}</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-tests_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +            <type>test-jar</type>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-test-utils_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-java_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +            <type>test-jar</type>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-scala_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +            <type>test-jar</type>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-cep_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +            <type>test-jar</type>
    +        </dependency>
    +    </dependencies>
    +
    +    <build>
    +        <plugins>
    +            <!-- Scala Compiler -->
    +            <plugin>
    +                <groupId>org.scala-tools</groupId>
    +                <artifactId>maven-scala-plugin</artifactId>
    +                <version>2.15.2</version>
    +                <executions>
    +                    <execution>
    +                        <goals>
    +                            <goal>compile</goal>
    +                            <goal>testCompile</goal>
    +                        </goals>
    +                    </execution>
    +                </executions>
    +                <configuration>
    +                    <sourceDir>src/main/scala</sourceDir>
    +                    <testSourceDir>src/test/scala</testSourceDir>
    +                    <jvmArgs>
    +                        <jvmArg>-Xms64m</jvmArg>
    +                        <jvmArg>-Xmx1024m</jvmArg>
    +                    </jvmArgs>
    +                </configuration>
    +            </plugin>
    --- End diff --
    
    Can we maybe replace this plugin with
    ```
    <plugin>
    				<groupId>net.alchim31.maven</groupId>
    				<artifactId>scala-maven-plugin</artifactId>
    				<version>3.2.2</version>
    				<executions>
    					<execution>
    						<goals>
    							<goal>compile</goal>
    							<goal>testCompile</goal>
    						</goals>
    					</execution>
    				</executions>
    			</plugin>
    ```
    I gave you the definition for the old Scala plugin. Sorry my bad.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60932255
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala ---
    @@ -0,0 +1,178 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala.pattern
    +
    +import org.apache.flink.api.common.functions.FilterFunction
    +import org.apache.flink.cep
    +import org.apache.flink.cep.pattern.{Pattern => JPattern}
    +import org.apache.flink.streaming.api.windowing.time.Time
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * Base class for a pattern definition.
    +  *
    +  * A pattern definition is used by [[org.apache.flink.cep.nfa.compiler.NFACompiler]] to create
    +  * a [[org.apache.flink.cep.nfa.NFA]].
    +  *
    +  * {{{
    +  * Pattern<T, F> pattern = Pattern.<T>begin("start")
    +  * .next("middle").subtype(F.class)
    +  * .followedBy("end").where(new MyFilterFunction());
    +  * }
    +  * }}}
    +  *
    +  * @param jPattern Underlying Java API Pattern
    +  * @tparam T Base type of the elements appearing in the pattern
    +  * @tparam F Subtype of T to which the current pattern operator is constrained
    +  */
    +class Pattern[T: ClassTag, F <: T](jPattern: JPattern[T, F]) {
    +
    +  private[flink] def getWrappedPattern = jPattern
    +
    +  /**
    +    *
    +    * @return Name of the pattern operator
    +    */
    +  def getName: String = jPattern.getName
    --- End diff --
    
    I know this might be a bit confusing since you don't know the convention which is only "stated" implicitly. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60924026
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala ---
    @@ -0,0 +1,46 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala.pattern
    +
    +import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern}
    +
    +import scala.reflect.ClassTag
    +
    +object FollowedByPattern {
    +  /**
    +    * Constructs a new Pattern by wrapping a given Java API Pattern
    +    *
    +    * @param jfbPattern Underlying Java API Pattern.
    +    * @tparam T Base type of the elements appearing in the pattern
    +    * @tparam F Subtype of T to which the current pattern operator is constrained
    +    * @return New wrapping FollowedByPattern object
    +    */
    +  def apply[T: ClassTag, F <: T](jfbPattern: JFollowedByPattern[T, F]) =
    --- End diff --
    
    I think the `ClassTag` should not be necessary here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60398887
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala ---
    @@ -0,0 +1,46 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.cep.scala.pattern.Pattern
    +import org.apache.flink.cep.{CEP => JCEP}
    +import org.apache.flink.streaming.api.scala.DataStream
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * Utility method to transform a { @link DataStream} into a { @link PatternStream} to do CEP.
    +  */
    +
    +object CEP {
    +  /**
    +    * Transforms a { @link DataStream[T]} into a { @link PatternStream[T]} in the Scala API.
    +    * See { @link org.apache.flink.cep.CEP} for a more detailed description how the underlying
    +    * Java API works.
    +    *
    +    * @param input   DataStream containing the input events
    +    * @param pattern Pattern specification which shall be detected
    +    * @tparam T Type of the input events
    +    * @return Resulting pattern stream
    +    */
    +  def pattern[T: TypeInformation : ClassTag](input: DataStream[T], pattern: Pattern[T, _]): PatternStream[T] = {
    --- End diff --
    
    `ClassTag` not needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60399207
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala ---
    @@ -0,0 +1,142 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala
    +
    +import java.util.{Map => JMap}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction, PatternStream => JPatternStream}
    +import org.apache.flink.streaming.api.scala._
    +import org.apache.flink.util.Collector
    +import scala.reflect.ClassTag
    +
    +/**
    +  * Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits detected
    +  * pattern sequences as a map of events associated with their names. The pattern is detected using a
    +  * { @link org.apache.flink.cep.nfa.NFA}. In order to process the detected sequences, the user
    +  * has to specify a { @link PatternSelectFunction} or a { @link PatternFlatSelectFunction}.
    +  *
    +  * @param jPatternStream Underlying pattern stream from Java API
    +  * @tparam T Type of the events
    +  */
    +class PatternStream[T: TypeInformation : ClassTag](jPatternStream: JPatternStream[T]) {
    --- End diff --
    
    `ClassTag` not needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60399384
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala ---
    @@ -0,0 +1,142 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala
    +
    +import java.util.{Map => JMap}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction, PatternStream => JPatternStream}
    +import org.apache.flink.streaming.api.scala._
    +import org.apache.flink.util.Collector
    +import scala.reflect.ClassTag
    +
    +/**
    +  * Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits detected
    +  * pattern sequences as a map of events associated with their names. The pattern is detected using a
    +  * { @link org.apache.flink.cep.nfa.NFA}. In order to process the detected sequences, the user
    +  * has to specify a { @link PatternSelectFunction} or a { @link PatternFlatSelectFunction}.
    +  *
    +  * @param jPatternStream Underlying pattern stream from Java API
    +  * @tparam T Type of the events
    +  */
    +class PatternStream[T: TypeInformation : ClassTag](jPatternStream: JPatternStream[T]) {
    +
    +  private[flink] def getWrappedPatternStream = jPatternStream
    +
    +  /**
    +    * Applies a select function to the detected pattern sequence. For each pattern sequence the
    +    * provided { @link PatternSelectFunction} is called. The pattern select function can produce
    +    * exactly one resulting element.
    +    *
    +    * @param patternSelectFunction The pattern select function which is called for each detected
    +    *                              pattern sequence.
    +    * @tparam R Type of the resulting elements
    +    * @return { @link DataStream} which contains the resulting elements from the pattern select
    +    *         unction.
    +    */
    +  def select[R: TypeInformation : ClassTag](patternSelectFunction: PatternSelectFunction[T, R]): DataStream[R] = {
    --- End diff --
    
    `ClassTag` not needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60924860
  
    --- Diff: flink-libraries/flink-cep-scala/pom.xml ---
    @@ -0,0 +1,209 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +    <parent>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-libraries</artifactId>
    +        <version>1.1-SNAPSHOT</version>
    +        <relativePath>..</relativePath>
    +    </parent>
    +    <artifactId>flink-cep-scala_2.10</artifactId>
    +    <name>flink-cep-scala</name>
    +    <packaging>jar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-scala_2.10</artifactId>
    +            <version>${project.version}</version>
    +			<scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-clients_2.10</artifactId>
    +            <version>${project.version}</version>
    +			<scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-cep_2.10</artifactId>
    +            <version>${project.version}</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-java_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-scala_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-tests_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +            <type>test-jar</type>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-test-utils_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-java_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +            <type>test-jar</type>
    +        </dependency>
    --- End diff --
    
    I think `flink-streaming-java_2.10` is not needed as a dependency.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60398573
  
    --- Diff: flink-libraries/flink-cep-scala/pom.xml ---
    @@ -0,0 +1,209 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +    <parent>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-libraries</artifactId>
    +        <version>1.1-SNAPSHOT</version>
    +        <relativePath>..</relativePath>
    +    </parent>
    +    <artifactId>flink-cep-scala_2.10</artifactId>
    +    <name>flink-cep-scala</name>
    +    <packaging>jar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-scala_2.10</artifactId>
    +            <version>${project.version}</version>
    +			<scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-clients_2.10</artifactId>
    +            <version>${project.version}</version>
    +			<scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-cep_2.10</artifactId>
    +            <version>${project.version}</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-java_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-scala_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-tests_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +            <type>test-jar</type>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-test-utils_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-java_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +            <type>test-jar</type>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-scala_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>test</scope>
    +            <type>test-jar</type>
    +        </dependency>
    +    </dependencies>
    +
    +    <build>
    +        <plugins>
    +            <!-- Scala Compiler -->
    +            <plugin>
    +                <groupId>net.alchim31.maven</groupId>
    +                <artifactId>scala-maven-plugin</artifactId>
    +                <version>3.1.4</version>
    +                <executions>
    +                    <!-- Run scala compiler in the process-resources phase, so that dependencies on
    +                        scala classes can be resolved later in the (Java) compile phase -->
    +                    <execution>
    +                        <id>scala-compile-first</id>
    +                        <phase>process-resources</phase>
    +                        <goals>
    +                            <goal>compile</goal>
    +                        </goals>
    +                    </execution>
    +
    +                    <!-- Run scala compiler in the process-test-resources phase, so that dependencies on
    +                         scala classes can be resolved later in the (Java) test-compile phase -->
    +                    <execution>
    +                        <id>scala-test-compile</id>
    +                        <phase>process-test-resources</phase>
    +                        <goals>
    +                            <goal>testCompile</goal>
    +                        </goals>
    +                    </execution>
    +                </executions>
    +                <configuration>
    +                    <jvmArgs>
    +                        <jvmArg>-Xms128m</jvmArg>
    +                        <jvmArg>-Xmx512m</jvmArg>
    +                    </jvmArgs>
    +                    <compilerPlugins combine.children="append">
    +                        <compilerPlugin>
    +                            <groupId>org.scalamacros</groupId>
    +                            <artifactId>paradise_${scala.version}</artifactId>
    +                            <version>${scala.macros.version}</version>
    +                        </compilerPlugin>
    +                    </compilerPlugins>
    +                </configuration>
    +            </plugin>
    +
    +            <!-- Eclipse Integration -->
    +            <plugin>
    +                <groupId>org.apache.maven.plugins</groupId>
    +                <artifactId>maven-eclipse-plugin</artifactId>
    +                <version>2.8</version>
    +                <configuration>
    +                    <downloadSources>true</downloadSources>
    +                    <projectnatures>
    +                        <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
    +                        <projectnature>org.eclipse.jdt.core.javanature</projectnature>
    +                    </projectnatures>
    +                    <buildcommands>
    +                        <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
    +                    </buildcommands>
    +                    <classpathContainers>
    +                        <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
    +                        <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
    +                    </classpathContainers>
    +                    <excludes>
    +                        <exclude>org.scala-lang:scala-library</exclude>
    +                        <exclude>org.scala-lang:scala-compiler</exclude>
    +                    </excludes>
    +                    <sourceIncludes>
    +                        <sourceInclude>**/*.scala</sourceInclude>
    +                        <sourceInclude>**/*.java</sourceInclude>
    +                    </sourceIncludes>
    +                </configuration>
    +            </plugin>
    --- End diff --
    
    I'm not sure how relevant this is anymore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60397079
  
    --- Diff: flink-libraries/flink-cep-scala/pom.xml ---
    @@ -0,0 +1,209 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +    <parent>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-libraries</artifactId>
    +        <version>1.1-SNAPSHOT</version>
    +        <relativePath>..</relativePath>
    +    </parent>
    +    <artifactId>flink-cep-scala_2.10</artifactId>
    +    <name>flink-cep-scala</name>
    +    <packaging>jar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-scala_2.10</artifactId>
    +            <version>${project.version}</version>
    +			<scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-clients_2.10</artifactId>
    +            <version>${project.version}</version>
    +			<scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-cep_2.10</artifactId>
    +            <version>${project.version}</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.flink</groupId>
    +            <artifactId>flink-streaming-java_2.10</artifactId>
    +            <version>${project.version}</version>
    +            <scope>provided</scope>
    +        </dependency>
    --- End diff --
    
    Not needed since it is transitively pulled in by `flink-streaming-scala_2.10`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---