You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@bahir.apache.org by haoch <gi...@git.apache.org> on 2017/11/18 14:18:20 UTC

[GitHub] bahir-flink pull request #22: [BAHIR-144] Add flink-library-siddhi from Flin...

GitHub user haoch opened a pull request:

    https://github.com/apache/bahir-flink/pull/22

    [BAHIR-144] Add flink-library-siddhi from Flink

    Moved from https://github.com/apache/flink/pull/2487 as suggested by Flink community.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/haoch/bahir-flink BAHIR-144

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/bahir-flink/pull/22.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22
    
----
commit e1932fad01791f0a967f0804bc981762172d7053
Author: Hao Chen <hc...@ebay.com>
Date:   2017-11-18T14:15:32Z

    Add flink-library-siddhi

----


---

[GitHub] bahir-flink issue #22: [BAHIR-144] Add flink-library-siddhi from Flink

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/bahir-flink/pull/22
  
    I'm going to merge this pull request now. Sorry for the delay.


---

[GitHub] bahir-flink pull request #22: [BAHIR-144] Add flink-library-siddhi from Flin...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/bahir-flink/pull/22#discussion_r151840400
  
    --- Diff: flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java ---
    @@ -0,0 +1,404 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.siddhi;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.flink.api.common.functions.InvalidTypesException;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.api.java.tuple.Tuple5;
    +import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
    +import org.apache.flink.streaming.siddhi.extension.CustomPlusFunctionExtension;
    +import org.apache.flink.streaming.siddhi.source.Event;
    +import org.apache.flink.streaming.siddhi.source.RandomEventSource;
    +import org.apache.flink.streaming.siddhi.source.RandomTupleSource;
    +import org.apache.flink.streaming.siddhi.source.RandomWordSource;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
    +import org.apache.flink.streaming.api.operators.StreamMap;
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
    +import org.junit.Assert;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +/**
    + * Flink-siddhi library integration test cases
    + */
    +public class SiddhiCEPITCase extends StreamingMultipleProgramsTestBase implements Serializable {
    +
    +    @Rule
    +    public transient TemporaryFolder tempFolder = new TemporaryFolder();
    +
    +    @Test
    +    public void testSimpleWriteAndRead() throws Exception {
    +        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +        DataStream<Event> input = env.fromElements(
    +            Event.of(1, "start", 1.0),
    +            Event.of(2, "middle", 2.0),
    +            Event.of(3, "end", 3.0),
    +            Event.of(4, "start", 4.0),
    +            Event.of(5, "middle", 5.0),
    +            Event.of(6, "end", 6.0)
    +        );
    +
    +        String path = tempFolder.newFile().toURI().toString();
    +        input.transform("transformer", TypeInformation.of(Event.class), new StreamMap<>(new MapFunction<Event, Event>() {
    +            @Override
    +            public Event map(Event event) throws Exception {
    +                return event;
    +            }
    +        })).writeAsText(path);
    +        env.execute();
    +        Assert.assertEquals(6, getLineCount(path));
    +    }
    +
    +    @Test
    +    public void testSimplePojoStreamAndReturnPojo() throws Exception {
    +        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +        DataStream<Event> input = env.fromElements(
    +            Event.of(1, "start", 1.0),
    +            Event.of(2, "middle", 2.0),
    +            Event.of(3, "end", 3.0),
    +            Event.of(4, "start", 4.0),
    +            Event.of(5, "middle", 5.0),
    +            Event.of(6, "end", 6.0)
    +        );
    +
    +        DataStream<Event> output = SiddhiCEP
    +            .define("inputStream", input, "id", "name", "price")
    +            .cql("from inputStream insert into  outputStream")
    +            .returns("outputStream", Event.class);
    +        String path = tempFolder.newFile().toURI().toString();
    +        output.print();
    +        env.execute();
    +        // Assert.assertEquals(6, getLineCount(path));
    --- End diff --
    
    This looks like being accidentally commented out 


---

[GitHub] bahir-flink issue #22: [BAHIR-144] Add flink-library-siddhi from Flink

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/bahir-flink/pull/22
  
    Thanks a lot for moving the PR over here!


---

[GitHub] bahir-flink pull request #22: [BAHIR-144] Add flink-library-siddhi from Flin...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/bahir-flink/pull/22#discussion_r152103730
  
    --- Diff: flink-library-siddhi/pom.xml ---
    @@ -0,0 +1,113 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  ~ Licensed to the Apache Software Foundation (ASF) under one or more
    +  ~ contributor license agreements.  See the NOTICE file distributed with
    +  ~ this work for additional information regarding copyright ownership.
    +  ~ The ASF licenses this file to You under the Apache License, Version 2.0
    +  ~ (the "License"); you may not use this file except in compliance with
    +  ~ the License.  You may obtain a copy of the License at
    +  ~
    +  ~    http://www.apache.org/licenses/LICENSE-2.0
    +  ~
    +  ~ Unless required by applicable law or agreed to in writing, software
    +  ~ distributed under the License is distributed on an "AS IS" BASIS,
    +  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  ~ See the License for the specific language governing permissions and
    +  ~ limitations under the License.
    +  -->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <parent>
    +        <groupId>org.apache.bahir</groupId>
    +        <artifactId>bahir-flink-parent_2.11</artifactId>
    +        <version>1.1-SNAPSHOT</version>
    +        <relativePath>..</relativePath>
    +    </parent>
    +
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <artifactId>flink-library-siddhi_2.11</artifactId>
    +    <name>flink-library-siddhi</name>
    +
    +    <packaging>jar</packaging>
    +
    +    <properties>
    +        <siddhi.version>4.0.0-M120</siddhi.version>
    +    </properties>
    +
    +    <dependencies>
    +        <!-- core dependencies -->
    +        <dependency>
    +            <groupId>org.wso2.siddhi</groupId>
    +            <artifactId>siddhi-core</artifactId>
    +            <version>${siddhi.version}</version>
    +            <exclusions>
    +                <exclusion>  <!-- declare the exclusion here -->
    +                    <groupId>org.apache.directory.jdbm</groupId>
    +                    <artifactId>apacheds-jdbm1</artifactId>
    +                </exclusion>
    +            </exclusions>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.wso2.siddhi</groupId>
    +            <artifactId>siddhi-query-api</artifactId>
    --- End diff --
    
    Dependency is Apache 2.0 licensed --> good.


---

[GitHub] bahir-flink issue #22: [BAHIR-144] Add flink-library-siddhi from Flink

Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on the issue:

    https://github.com/apache/bahir-flink/pull/22
  
    Thanks very much!


---

[GitHub] bahir-flink issue #22: [BAHIR-144] Add flink-library-siddhi from Flink

Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on the issue:

    https://github.com/apache/bahir-flink/pull/22
  
    @rmetzger what's the plan or additional requirements for merging it? 


---

[GitHub] bahir-flink pull request #22: [BAHIR-144] Add flink-library-siddhi from Flin...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/bahir-flink/pull/22


---

[GitHub] bahir-flink pull request #22: [BAHIR-144] Add flink-library-siddhi from Flin...

Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on a diff in the pull request:

    https://github.com/apache/bahir-flink/pull/22#discussion_r152175023
  
    --- Diff: flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java ---
    @@ -0,0 +1,404 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.siddhi;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.flink.api.common.functions.InvalidTypesException;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.api.java.tuple.Tuple5;
    +import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
    +import org.apache.flink.streaming.siddhi.extension.CustomPlusFunctionExtension;
    +import org.apache.flink.streaming.siddhi.source.Event;
    +import org.apache.flink.streaming.siddhi.source.RandomEventSource;
    +import org.apache.flink.streaming.siddhi.source.RandomTupleSource;
    +import org.apache.flink.streaming.siddhi.source.RandomWordSource;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
    +import org.apache.flink.streaming.api.operators.StreamMap;
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
    +import org.junit.Assert;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +/**
    + * Flink-siddhi library integration test cases
    + */
    +public class SiddhiCEPITCase extends StreamingMultipleProgramsTestBase implements Serializable {
    +
    +    @Rule
    +    public transient TemporaryFolder tempFolder = new TemporaryFolder();
    +
    +    @Test
    +    public void testSimpleWriteAndRead() throws Exception {
    +        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +        DataStream<Event> input = env.fromElements(
    +            Event.of(1, "start", 1.0),
    +            Event.of(2, "middle", 2.0),
    +            Event.of(3, "end", 3.0),
    +            Event.of(4, "start", 4.0),
    +            Event.of(5, "middle", 5.0),
    +            Event.of(6, "end", 6.0)
    +        );
    +
    +        String path = tempFolder.newFile().toURI().toString();
    +        input.transform("transformer", TypeInformation.of(Event.class), new StreamMap<>(new MapFunction<Event, Event>() {
    +            @Override
    +            public Event map(Event event) throws Exception {
    +                return event;
    +            }
    +        })).writeAsText(path);
    +        env.execute();
    +        Assert.assertEquals(6, getLineCount(path));
    +    }
    +
    +    @Test
    +    public void testSimplePojoStreamAndReturnPojo() throws Exception {
    +        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +        DataStream<Event> input = env.fromElements(
    +            Event.of(1, "start", 1.0),
    +            Event.of(2, "middle", 2.0),
    +            Event.of(3, "end", 3.0),
    +            Event.of(4, "start", 4.0),
    +            Event.of(5, "middle", 5.0),
    +            Event.of(6, "end", 6.0)
    +        );
    +
    +        DataStream<Event> output = SiddhiCEP
    +            .define("inputStream", input, "id", "name", "price")
    +            .cql("from inputStream insert into  outputStream")
    +            .returns("outputStream", Event.class);
    +        String path = tempFolder.newFile().toURI().toString();
    +        output.print();
    +        env.execute();
    +        // Assert.assertEquals(6, getLineCount(path));
    --- End diff --
    
    Resolved


---

[GitHub] bahir-flink issue #22: [BAHIR-144] Add flink-library-siddhi from Flink

Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on the issue:

    https://github.com/apache/bahir-flink/pull/22
  
    @rmetzger thanks for the review. As to documentation, I will continue to add more java doc and contribute the web site later.
    
    BTW: What's bahir's site repo?


---

[GitHub] bahir-flink issue #22: [BAHIR-144] Add flink-library-siddhi from Flink

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/bahir-flink/pull/22
  
    I think the change is good to be merged.
    The tests might be unstable, but maybe that's just a local issue...we will see.
    I think we should then soon add documentation for Siddhi as well.
    
    I'm going to keep the PR open for a few more days in case somebody else wants to take a look.


---