You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@bahir.apache.org by haoch <gi...@git.apache.org> on 2017/11/18 14:18:20 UTC
[GitHub] bahir-flink pull request #22: [BAHIR-144] Add flink-library-siddhi from Flin...
GitHub user haoch opened a pull request:
https://github.com/apache/bahir-flink/pull/22
[BAHIR-144] Add flink-library-siddhi from Flink
Moved from https://github.com/apache/flink/pull/2487 as suggested by Flink community.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/haoch/bahir-flink BAHIR-144
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/bahir-flink/pull/22.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #22
----
commit e1932fad01791f0a967f0804bc981762172d7053
Author: Hao Chen <hc...@ebay.com>
Date: 2017-11-18T14:15:32Z
Add flink-library-siddhi
----
---
[GitHub] bahir-flink issue #22: [BAHIR-144] Add flink-library-siddhi from Flink
Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:
https://github.com/apache/bahir-flink/pull/22
I'm going to merge this pull request now. Sorry for the delay.
---
[GitHub] bahir-flink pull request #22: [BAHIR-144] Add flink-library-siddhi from Flin...
Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/bahir-flink/pull/22#discussion_r151840400
--- Diff: flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java ---
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.streaming.siddhi.extension.CustomPlusFunctionExtension;
+import org.apache.flink.streaming.siddhi.source.Event;
+import org.apache.flink.streaming.siddhi.source.RandomEventSource;
+import org.apache.flink.streaming.siddhi.source.RandomTupleSource;
+import org.apache.flink.streaming.siddhi.source.RandomWordSource;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Flink-siddhi library integration test cases
+ */
+public class SiddhiCEPITCase extends StreamingMultipleProgramsTestBase implements Serializable {
+
+ @Rule
+ public transient TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Test
+ public void testSimpleWriteAndRead() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<Event> input = env.fromElements(
+ Event.of(1, "start", 1.0),
+ Event.of(2, "middle", 2.0),
+ Event.of(3, "end", 3.0),
+ Event.of(4, "start", 4.0),
+ Event.of(5, "middle", 5.0),
+ Event.of(6, "end", 6.0)
+ );
+
+ String path = tempFolder.newFile().toURI().toString();
+ input.transform("transformer", TypeInformation.of(Event.class), new StreamMap<>(new MapFunction<Event, Event>() {
+ @Override
+ public Event map(Event event) throws Exception {
+ return event;
+ }
+ })).writeAsText(path);
+ env.execute();
+ Assert.assertEquals(6, getLineCount(path));
+ }
+
+ @Test
+ public void testSimplePojoStreamAndReturnPojo() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<Event> input = env.fromElements(
+ Event.of(1, "start", 1.0),
+ Event.of(2, "middle", 2.0),
+ Event.of(3, "end", 3.0),
+ Event.of(4, "start", 4.0),
+ Event.of(5, "middle", 5.0),
+ Event.of(6, "end", 6.0)
+ );
+
+ DataStream<Event> output = SiddhiCEP
+ .define("inputStream", input, "id", "name", "price")
+ .cql("from inputStream insert into outputStream")
+ .returns("outputStream", Event.class);
+ String path = tempFolder.newFile().toURI().toString();
+ output.print();
+ env.execute();
+ // Assert.assertEquals(6, getLineCount(path));
--- End diff --
This looks like being accidentally commented out
---
[GitHub] bahir-flink issue #22: [BAHIR-144] Add flink-library-siddhi from Flink
Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:
https://github.com/apache/bahir-flink/pull/22
Thanks a lot for moving the PR over here!
---
[GitHub] bahir-flink pull request #22: [BAHIR-144] Add flink-library-siddhi from Flin...
Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/bahir-flink/pull/22#discussion_r152103730
--- Diff: flink-library-siddhi/pom.xml ---
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>bahir-flink-parent_2.11</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-library-siddhi_2.11</artifactId>
+ <name>flink-library-siddhi</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <siddhi.version>4.0.0-M120</siddhi.version>
+ </properties>
+
+ <dependencies>
+ <!-- core dependencies -->
+ <dependency>
+ <groupId>org.wso2.siddhi</groupId>
+ <artifactId>siddhi-core</artifactId>
+ <version>${siddhi.version}</version>
+ <exclusions>
+ <exclusion> <!-- declare the exclusion here -->
+ <groupId>org.apache.directory.jdbm</groupId>
+ <artifactId>apacheds-jdbm1</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.wso2.siddhi</groupId>
+ <artifactId>siddhi-query-api</artifactId>
--- End diff --
Dependency is Apache 2.0 licensed --> good.
---
[GitHub] bahir-flink issue #22: [BAHIR-144] Add flink-library-siddhi from Flink
Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on the issue:
https://github.com/apache/bahir-flink/pull/22
Thanks very much!
---
[GitHub] bahir-flink issue #22: [BAHIR-144] Add flink-library-siddhi from Flink
Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on the issue:
https://github.com/apache/bahir-flink/pull/22
@rmetzger what's the plan or additional requirements for merging it?
---
[GitHub] bahir-flink pull request #22: [BAHIR-144] Add flink-library-siddhi from Flin...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/bahir-flink/pull/22
---
[GitHub] bahir-flink pull request #22: [BAHIR-144] Add flink-library-siddhi from Flin...
Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on a diff in the pull request:
https://github.com/apache/bahir-flink/pull/22#discussion_r152175023
--- Diff: flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java ---
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.siddhi;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.streaming.siddhi.extension.CustomPlusFunctionExtension;
+import org.apache.flink.streaming.siddhi.source.Event;
+import org.apache.flink.streaming.siddhi.source.RandomEventSource;
+import org.apache.flink.streaming.siddhi.source.RandomTupleSource;
+import org.apache.flink.streaming.siddhi.source.RandomWordSource;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Flink-siddhi library integration test cases
+ */
+public class SiddhiCEPITCase extends StreamingMultipleProgramsTestBase implements Serializable {
+
+ @Rule
+ public transient TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Test
+ public void testSimpleWriteAndRead() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<Event> input = env.fromElements(
+ Event.of(1, "start", 1.0),
+ Event.of(2, "middle", 2.0),
+ Event.of(3, "end", 3.0),
+ Event.of(4, "start", 4.0),
+ Event.of(5, "middle", 5.0),
+ Event.of(6, "end", 6.0)
+ );
+
+ String path = tempFolder.newFile().toURI().toString();
+ input.transform("transformer", TypeInformation.of(Event.class), new StreamMap<>(new MapFunction<Event, Event>() {
+ @Override
+ public Event map(Event event) throws Exception {
+ return event;
+ }
+ })).writeAsText(path);
+ env.execute();
+ Assert.assertEquals(6, getLineCount(path));
+ }
+
+ @Test
+ public void testSimplePojoStreamAndReturnPojo() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<Event> input = env.fromElements(
+ Event.of(1, "start", 1.0),
+ Event.of(2, "middle", 2.0),
+ Event.of(3, "end", 3.0),
+ Event.of(4, "start", 4.0),
+ Event.of(5, "middle", 5.0),
+ Event.of(6, "end", 6.0)
+ );
+
+ DataStream<Event> output = SiddhiCEP
+ .define("inputStream", input, "id", "name", "price")
+ .cql("from inputStream insert into outputStream")
+ .returns("outputStream", Event.class);
+ String path = tempFolder.newFile().toURI().toString();
+ output.print();
+ env.execute();
+ // Assert.assertEquals(6, getLineCount(path));
--- End diff --
Resolved
---
[GitHub] bahir-flink issue #22: [BAHIR-144] Add flink-library-siddhi from Flink
Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on the issue:
https://github.com/apache/bahir-flink/pull/22
@rmetzger thanks for the review. As to documentation, I will continue to add more java doc and contribute the web site later.
BTW: What's bahir's site repo?
---
[GitHub] bahir-flink issue #22: [BAHIR-144] Add flink-library-siddhi from Flink
Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:
https://github.com/apache/bahir-flink/pull/22
I think the change is good to be merged.
The tests might be unstable, but maybe that's just a local issue...we will see.
I think we should then soon add documentation for Siddhi as well.
I'm going to keep the PR open for a few more days in case somebody else wants to take a look.
---