You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/05/28 15:36:29 UTC
[3/7] flink git commit: [FLINK-1874] [streaming] Connectors separated
into individual projects
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
new file mode 100644
index 0000000..4bc6df0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
+import org.apache.flink.util.Collector;
+import org.apache.sling.commons.json.JSONException;
+
+/**
+ * This program demonstrate the use of TwitterSource.
+ * Its aim is to count the frequency of the languages of tweets
+ */
+public class TwitterTopology {
+
+ private static final int NUMBEROFTWEETS = 100;
+
+ /**
+ * FlatMapFunction to determine the language of tweets if possible
+ */
+ public static class SelectLanguageFlatMap extends
+ JSONParseFlatMap<String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Select the language from the incoming JSON text
+ */
+ @Override
+ public void flatMap(String value, Collector<String> out) throws Exception {
+ try{
+ out.collect(getString(value, "lang"));
+ }
+ catch (JSONException e){
+ out.collect("");
+ }
+ }
+
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ String path = new String();
+
+ if (args != null && args.length == 1) {
+ path = args[0];
+ } else {
+ System.err.println("USAGE:\nTwitterLocal <pathToPropertiesFile>");
+ return;
+ }
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS));
+
+
+ DataStream<Tuple2<String, Integer>> dataStream = streamSource
+ .flatMap(new SelectLanguageFlatMap())
+ .map(new MapFunction<String, Tuple2<String, Integer>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, Integer> map(String value) throws Exception {
+ return new Tuple2<String, Integer>(value, 1);
+ }
+ })
+ .groupBy(0)
+ .sum(1);
+
+ dataStream.print();
+
+ env.execute();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
new file mode 100644
index 0000000..b1d4115
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.flink.streaming.connectors.json.JSONParser;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class JSONParserTest {
+
+ private String jsonText;
+ private String searchedField;
+
+ public JSONParserTest(String text, String field) {
+ jsonText = text;
+ searchedField = field;
+ }
+
+ @Parameters
+ public static Collection<Object[]> initParameterList() {
+
+ Object[][] parameterList = new Object[][] {
+ { "{\"key\":\"value\"}", "key" },
+ { "{\"key\":[\"value\"]}", "key[0]" },
+ { "{\"key\":[{\"key\":\"value\"}]}", "key[0].key" },
+ { "{\"key\":[{\"key\":[{\"key\":\"value\"}]}]}", "key[0].key[0].key"},
+ { "{\"key\":[1,[{\"key\":\"value\"}]]}", "key[1][0].key" },
+ { "{\"key\":[1,[[\"key\",2,\"value\"]]]}", "key[1][0][2]" },
+ { "{\"key\":{\"key\":{\"otherKey\":\"wrongValue\",\"key\":\"value\"},\"otherKey\":\"wrongValue\"},\"otherKey\":\"wrongValue\"}" , "key.key.key"}
+ };
+
+ return Arrays.asList(parameterList);
+ }
+
+ @Test
+ public void test() {
+ try {
+ JSONParser parser = new JSONParser(jsonText);
+ JSONObject jo = parser.parse(searchedField);
+ String expected = "{\"retValue\":\"value\"}";
+
+ assertTrue(expected.equals(jo.toString()));
+ }
+ catch (JSONException e) {
+ fail();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
new file mode 100644
index 0000000..8851086
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.streaming.connectors.json.JSONParser;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+import org.junit.Test;
+
+
+public class JSONParserTest2 {
+
+ @Test
+ public void testGetBooleanFunction() {
+ String jsonText = "{\"key\":true}";
+ String searchedField = "key";
+ try {
+ JSONParser parser = new JSONParser(jsonText);
+ JSONObject jo = parser.parse(searchedField);
+
+ assertTrue(jo.getBoolean("retValue"));
+ }
+ catch (JSONException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testGetDoubleFunction() {
+ double expected = 12345.12345;
+ String jsonText = "{\"key\":" + expected + "}";
+ String searchedField = "key";
+ try {
+ JSONParser parser = new JSONParser(jsonText);
+ JSONObject jo = parser.parse(searchedField);
+
+ assertEquals(expected,jo.getDouble("retValue"),0.000001);
+ }
+ catch (JSONException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testGetIntFunction() {
+ int expected = 15;
+ String jsonText = "{\"key\":" + expected + "}";
+ String searchedField = "key";
+ try {
+ JSONParser parser = new JSONParser(jsonText);
+ JSONObject jo = parser.parse(searchedField);
+
+ assertEquals(expected,jo.getInt("retValue"));
+ }
+ catch (JSONException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testGetLongFunction() {
+ long expected = 111111111111L;
+ String jsonText = "{\"key\":" + expected + "}";
+ String searchedField = "key";
+ try {
+ JSONParser parser = new JSONParser(jsonText);
+ JSONObject jo = parser.parse(searchedField);
+
+ assertEquals(expected,jo.getLong("retValue"));
+ }
+ catch (JSONException e) {
+ fail();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/log4j-test.properties b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..9ede613
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/logback-test.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+ <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
index 1ecf6c1..4b82d09 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
@@ -30,215 +30,16 @@ under the License.
<relativePath>..</relativePath>
</parent>
- <artifactId>flink-streaming-connectors</artifactId>
+ <artifactId>flink-streaming-connectors-parent</artifactId>
<name>flink-streaming-connectors</name>
- <packaging>jar</packaging>
+ <packaging>pom</packaging>
- <!-- Allow users to pass custom connector versions -->
- <properties>
- <kafka.version>0.8.2.0</kafka.version>
- <rabbitmq.version>3.3.1</rabbitmq.version>
- <flume-ng.version>1.5.0</flume-ng.version>
- </properties>
+ <modules>
+ <module>flink-connector-flume</module>
+ <module>flink-connector-kafka</module>
+ <module>flink-connector-rabbitmq</module>
+ <module>flink-connector-twitter</module>
+ </modules>
- <dependencies>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-core</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_${scala.binary.version}</artifactId>
- <version>${kafka.version}</version>
- <exclusions>
- <exclusion>
- <groupId>com.sun.jmx</groupId>
- <artifactId>jmxri</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jdmk</groupId>
- <artifactId>jmxtools</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
- </exclusion>
- <exclusion>
- <groupId>net.sf.jopt-simple</groupId>
- <artifactId>jopt-simple</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-reflect</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-compiler</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.yammer.metrics</groupId>
- <artifactId>metrics-annotation</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-test</artifactId>
- <version>2.7.1</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>${rabbitmq.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-core</artifactId>
- <version>${flume-ng.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-cli</groupId>
- <artifactId>commons-cli</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-core-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.thoughtworks.paranamer</groupId>
- <artifactId>paranamer</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.tukaani</groupId>
- <artifactId>xz</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.velocity</groupId>
- <artifactId>velocity</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-collections</groupId>
- <artifactId>commons-collections</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty-util</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>com.twitter</groupId>
- <artifactId>hbc-core</artifactId>
- <version>2.2.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.sling</groupId>
- <artifactId>org.apache.sling.commons.json</artifactId>
- <version>2.0.6</version>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <!-- Override artifactSet configuration to build fat-jar with all dependencies packed. -->
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.3</version>
- <executions>
- <execution>
- <id>shade-flink</id>
- <configuration>
- <artifactSet>
- <includes combine.children="append">
- <!-- We include all dependencies that transitively depend on guava -->
- <include>org.apache.flume:*</include>
- <include>com.twitter:hbc-core</include>
- <include>com.twitter:joauth</include>
- </includes>
- </artifactSet>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
deleted file mode 100644
index fde4cdf..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT> implements ResultTypeQueryable<OUT>{
-
- private static final long serialVersionUID = 1L;
- protected DeserializationSchema<OUT> schema;
-
- public ConnectorSource(DeserializationSchema<OUT> schema) {
- this.schema = schema;
- }
-
- @Override
- public TypeInformation<OUT> getProducedType() {
- return schema.getProducedType();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
deleted file mode 100644
index 50f5770..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.flume;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.api.RpcClient;
-import org.apache.flume.api.RpcClientFactory;
-import org.apache.flume.event.EventBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FlumeSink<IN> extends RichSinkFunction<IN> {
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class);
-
- private transient FlinkRpcClientFacade client;
- boolean initDone = false;
- String host;
- int port;
- SerializationSchema<IN, byte[]> schema;
-
- public FlumeSink(String host, int port, SerializationSchema<IN, byte[]> schema) {
- this.host = host;
- this.port = port;
- this.schema = schema;
- }
-
- /**
- * Receives tuples from the Apache Flink {@link DataStream} and forwards
- * them to Apache Flume.
- *
- * @param value
- * The tuple arriving from the datastream
- */
- @Override
- public void invoke(IN value) {
-
- byte[] data = schema.serialize(value);
- client.sendDataToFlume(data);
-
- }
-
- private class FlinkRpcClientFacade {
- private RpcClient client;
- private String hostname;
- private int port;
-
- /**
- * Initializes the connection to Apache Flume.
- *
- * @param hostname
- * The host
- * @param port
- * The port.
- */
- public void init(String hostname, int port) {
- // Setup the RPC connection
- this.hostname = hostname;
- this.port = port;
- int initCounter = 0;
- while (true) {
- if (initCounter >= 90) {
- throw new RuntimeException("Cannot establish connection with" + port + " at "
- + host);
- }
- try {
- this.client = RpcClientFactory.getDefaultInstance(hostname, port);
- } catch (FlumeException e) {
- // Wait one second if the connection failed before the next
- // try
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e1) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Interrupted while trying to connect {} at {}", port, host);
- }
- }
- }
- if (client != null) {
- break;
- }
- initCounter++;
- }
- initDone = true;
- }
-
- /**
- * Sends byte arrays as {@link Event} series to Apache Flume.
- *
- * @param data
- * The byte array to send to Apache FLume
- */
- public void sendDataToFlume(byte[] data) {
- Event event = EventBuilder.withBody(data);
-
- try {
- client.append(event);
-
- } catch (EventDeliveryException e) {
- // clean up and recreate the client
- client.close();
- client = null;
- client = RpcClientFactory.getDefaultInstance(hostname, port);
- }
- }
-
- }
-
- @Override
- public void close() {
- client.client.close();
- }
-
- @Override
- public void open(Configuration config) {
- client = new FlinkRpcClientFacade();
- client.init(host, port);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
deleted file mode 100644
index bb9ce38..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
+++ /dev/null
@@ -1,149 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements. See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.streaming.connectors.flume;
-//
-//import java.util.List;
-//
-//import org.apache.flink.streaming.api.datastream.DataStream;
-//import org.apache.flink.streaming.connectors.ConnectorSource;
-//import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-//import org.apache.flink.util.Collector;
-//import org.apache.flume.Context;
-//import org.apache.flume.channel.ChannelProcessor;
-//import org.apache.flume.source.AvroSource;
-//import org.apache.flume.source.avro.AvroFlumeEvent;
-//import org.apache.flume.source.avro.Status;
-//
-//public class FlumeSource<OUT> extends ConnectorSource<OUT> {
-// private static final long serialVersionUID = 1L;
-//
-// String host;
-// String port;
-// volatile boolean finished = false;
-//
-// private volatile boolean isRunning = false;
-//
-// FlumeSource(String host, int port, DeserializationSchema<OUT> deserializationSchema) {
-// super(deserializationSchema);
-// this.host = host;
-// this.port = Integer.toString(port);
-// }
-//
-// public class MyAvroSource extends AvroSource {
-// Collector<OUT> output;
-//
-// /**
-// * Sends the AvroFlumeEvent from it's argument list to the Apache Flink
-// * {@link DataStream}.
-// *
-// * @param avroEvent
-// * The event that should be sent to the dataStream
-// * @return A {@link Status}.OK message if sending the event was
-// * successful.
-// */
-// @Override
-// public Status append(AvroFlumeEvent avroEvent) {
-// collect(avroEvent);
-// return Status.OK;
-// }
-//
-// /**
-// * Sends the AvroFlumeEvents from it's argument list to the Apache Flink
-// * {@link DataStream}.
-// *
-// * @param events
-// * The events that is sent to the dataStream
-// * @return A Status.OK message if sending the events was successful.
-// */
-// @Override
-// public Status appendBatch(List<AvroFlumeEvent> events) {
-// for (AvroFlumeEvent avroEvent : events) {
-// collect(avroEvent);
-// }
-//
-// return Status.OK;
-// }
-//
-// /**
-// * Deserializes the AvroFlumeEvent before sending it to the Apache Flink
-// * {@link DataStream}.
-// *
-// * @param avroEvent
-// * The event that is sent to the dataStream
-// */
-// private void collect(AvroFlumeEvent avroEvent) {
-// byte[] b = avroEvent.getBody().array();
-// OUT out = FlumeSource.this.schema.deserialize(b);
-//
-// if (schema.isEndOfStream(out)) {
-// FlumeSource.this.finished = true;
-// this.stop();
-// FlumeSource.this.notifyAll();
-// } else {
-// output.collect(out);
-// }
-//
-// }
-//
-// }
-//
-// MyAvroSource avroSource;
-//
-// /**
-// * Configures the AvroSource. Also sets the output so the application can
-// * use it from outside of the invoke function.
-// *
-// * @param output
-// * The output used in the invoke function
-// */
-// public void configureAvroSource(Collector<OUT> output) {
-//
-// avroSource = new MyAvroSource();
-// avroSource.output = output;
-// Context context = new Context();
-// context.put("port", port);
-// context.put("bind", host);
-// avroSource.configure(context);
-// // An instance of a ChannelProcessor is required for configuring the
-// // avroSource although it will not be used in this case.
-// ChannelProcessor cp = new ChannelProcessor(null);
-// avroSource.setChannelProcessor(cp);
-// }
-//
-// /**
-// * Configures the AvroSource and runs until the user calls a close function.
-// *
-// * @param output
-// * The Collector for sending data to the datastream
-// */
-// @Override
-// public void run(Collector<OUT> output) throws Exception {
-// isRunning = true;
-// configureAvroSource(output);
-// avroSource.start();
-// while (!finished && isRunning) {
-// this.wait();
-// }
-// }
-//
-// @Override
-// public void cancel() {
-// isRunning = false;
-// }
-//
-//}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
deleted file mode 100644
index f630bce..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
+++ /dev/null
@@ -1,49 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements. See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.streaming.connectors.flume;
-//
-//import org.apache.flink.streaming.api.datastream.DataStream;
-//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-//import org.apache.flink.streaming.util.serialization.SerializationSchema;
-//import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-//
-//public class FlumeTopology {
-//
-// public static void main(String[] args) throws Exception {
-//
-// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-//
-// @SuppressWarnings("unused")
-// DataStream<String> dataStream1 = env.addSource(
-// new FlumeSource<String>("localhost", 41414, new SimpleStringSchema())).addSink(
-// new FlumeSink<String>("localhost", 42424, new StringToByteSerializer()));
-//
-// env.execute();
-// }
-//
-// public static class StringToByteSerializer implements SerializationSchema<String, byte[]> {
-//
-// private static final long serialVersionUID = 1L;
-//
-// @Override
-// public byte[] serialize(String element) {
-// return element.getBytes();
-// }
-// }
-//
-//}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
deleted file mode 100644
index 0f16541..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.json;
-
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.sling.commons.json.JSONException;
-
-/**
- * Abstract class derived from {@link RichFlatMapFunction} to handle JSON files.
- *
- * @param <IN>
- * Type of the input elements.
- * @param <OUT>
- * Type of the returned elements.
- */
-public abstract class JSONParseFlatMap<IN, OUT> extends RichFlatMapFunction<IN, OUT> {
-
- private static final long serialVersionUID = 1L;
-
- // private static final Log LOG = LogFactory.getLog(JSONParseFlatMap.class);
-
- /**
- * Get the value object associated with a key form a JSON code. It can find
- * embedded fields, too.
- *
- * @param jsonText
- * JSON String in which the field is searched.
- * @param field
- * The key whose value is searched for.
- * @return The object associated with the field.
- * @throws JSONException
- * If the field is not found.
- */
- public Object get(String jsonText, String field) throws JSONException {
- JSONParser parser = new JSONParser(jsonText);
-
- return parser.parse(field).get("retValue");
- }
-
- /**
- * Get the boolean value associated with a key form a JSON code. It can find
- * embedded fields, too.
- *
- * @param jsonText
- * JSON String in which the field is searched.
- * @param field
- * The key whose value is searched for.
- * @return The object associated with the field.
- * @throws JSONException
- * If the field is not found.
- */
- public boolean getBoolean(String jsonText, String field) throws JSONException {
- JSONParser parser = new JSONParser(jsonText);
-
- return parser.parse(field).getBoolean("retValue");
- }
-
- /**
- * Get the double value associated with a key form a JSON code. It can find
- * embedded fields, too.
- *
- * @param jsonText
- * JSON String in which the field is searched.
- * @param field
- * The key whose value is searched for.
- * @return The object associated with the field.
- * @throws JSONException
- * If the field is not found.
- */
- public double getDouble(String jsonText, String field) throws JSONException {
- JSONParser parser = new JSONParser(jsonText);
-
- return parser.parse(field).getDouble("retValue");
- }
-
- /**
- * Get the int value associated with a key form a JSON code. It can find
- * embedded fields, too.
- *
- * @param jsonText
- * JSON String in which the field is searched.
- * @param field
- * The key whose value is searched for.
- * @return The object associated with the field.
- * @throws JSONException
- * If the field is not found.
- */
- public int getInt(String jsonText, String field) throws JSONException {
- JSONParser parser = new JSONParser(jsonText);
-
- return parser.parse(field).getInt("retValue");
- }
-
- /**
- * Get the long value associated with a key form a JSON code. It can find
- * embedded fields, too.
- *
- * @param jsonText
- * JSON String in which the field is searched.
- * @param field
- * The key whose value is searched for.
- * @return The object associated with the field.
- * @throws JSONException
- * If the field is not found.
- */
- public long getLong(String jsonText, String field) throws JSONException {
- JSONParser parser = new JSONParser(jsonText);
-
- return parser.parse(field).getLong("retValue");
- }
-
- /**
- * Get the String value associated with a key form a JSON code. It can find
- * embedded fields, too.
- *
- * @param jsonText
- * JSON String in which the field is searched.
- * @param field
- * The key whose value is searched for.
- * @return The object associated with the field.
- * @throws JSONException
- * If the field is not found.
- */
- public String getString(String jsonText, String field) throws JSONException {
- JSONParser parser = new JSONParser(jsonText);
-
- return parser.parse(field).getString("retValue");
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
deleted file mode 100644
index c1eabbd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.json;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.StringTokenizer;
-
-import org.apache.sling.commons.json.JSONArray;
-import org.apache.sling.commons.json.JSONException;
-import org.apache.sling.commons.json.JSONObject;
-
-/**
- * A JSONParser contains a JSONObject and provides opportunity to access
- * embedded fields in JSON code.
- */
-public class JSONParser {
-
- private JSONObject originalJO;
- private String searchedfield;
- private Object temp;
-
- /**
- * Construct a JSONParser from a string. The string has to be a JSON code
- * from which we want to get a field.
- *
- * @param jsonText
- * A string which contains a JSON code. String representation of
- * a JSON code.
- * @throws JSONException
- * If there is a syntax error in the source string.
- */
- public JSONParser(String jsonText) throws JSONException {
- originalJO = new JSONObject(jsonText);
- }
-
- /**
- *
- * Parse the JSON code passed to the constructor to find the given key.
- *
- * @param key
- * The key whose value is searched for.
- * @return A JSONObject which has only one field called "retValue" and the
- * value associated to it is the searched value. The methods of
- * JSONObject can be used to get the field value in a desired
- * format.
- * @throws JSONException
- * If the key is not found.
- */
- public JSONObject parse(String key) throws JSONException {
- initializeParser(key);
- parsing();
- return putResultInJSONObj();
- }
-
- /**
- * Prepare the fields of the class for the parsing
- *
- * @param key
- * The key whose value is searched for.
- * @throws JSONException
- * If the key is not found.
- */
- private void initializeParser(String key) throws JSONException {
- searchedfield = key;
- temp = new JSONObject(originalJO.toString());
- }
-
- /**
- * This function goes through the given field and calls the appropriate
- * functions to treat the units between the punctuation marks.
- *
- * @throws JSONException
- * If the key is not found.
- */
- private void parsing() throws JSONException {
- StringTokenizer st = new StringTokenizer(searchedfield, ".");
- while (st.hasMoreTokens()) {
- find(st.nextToken());
- }
- }
-
- /**
- * Search for the next part of the field and update the state if it was
- * found.
- *
- * @param nextToken
- * The current part of the searched field.
- * @throws JSONException
- * If the key is not found.
- */
- private void find(String nextToken) throws JSONException {
- if (endsWithBracket(nextToken)) {
- treatAllBracket(nextToken);
- } else {
- temp = ((JSONObject) temp).get(nextToken);
- }
- }
-
- /**
- * Determine whether the given string ends with a closing square bracket ']'
- *
- * @param nextToken
- * The current part of the searched field.
- * @return True if the given string ends with a closing square bracket ']'
- * and false otherwise.
- */
- private boolean endsWithBracket(String nextToken) {
- return nextToken.substring(nextToken.length() - 1).endsWith("]");
- }
-
- /**
- * Handle (multidimensional) arrays. Treat the square bracket pairs one
- * after the other if necessary.
- *
- * @param nextToken
- * The current part of the searched field.
- * @throws JSONException
- * If the searched element is not found.
- */
- private void treatAllBracket(String nextToken) throws JSONException {
- List<String> list = Arrays.asList(nextToken.split("\\["));
- ListIterator<String> iter = list.listIterator();
-
- temp = ((JSONObject) temp).get(iter.next());
-
- while (iter.hasNext()) {
- int index = Integer.parseInt(cutBracket(iter.next()));
- temp = ((JSONArray) temp).get(index);
- }
- }
-
- /**
- * Remove the last character of the string.
- *
- * @param string
- * String to modify.
- * @return The given string without the last character.
- */
- private String cutBracket(String string) {
- return string.substring(0, string.length() - 1);
- }
-
- /**
- * Save the result of the search into a JSONObject.
- *
- * @return A special JSONObject which contain only one key. The value
- * associated to this key is the result of the search.
- * @throws JSONException
- * If there is a problem creating the JSONObject. (e.g. invalid
- * syntax)
- */
- private JSONObject putResultInJSONObj() throws JSONException {
- JSONObject jo = new JSONObject();
- jo.put("retValue", temp);
- return jo;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
deleted file mode 100644
index fe6684d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-
-public class KafkaConsumerExample {
-
- private static String host;
- private static int port;
- private static String topic;
-
- public static void main(String[] args) throws Exception {
-
- if (!parseParameters(args)) {
- return;
- }
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);
-
- DataStream<String> kafkaStream = env
- .addSource(new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()));
- kafkaStream.print();
-
- env.execute();
- }
-
- private static boolean parseParameters(String[] args) {
- if (args.length == 3) {
- host = args[0];
- port = Integer.parseInt(args[1]);
- topic = args[2];
- return true;
- } else {
- System.err.println("Usage: KafkaConsumerExample <host> <port> <topic>");
- return false;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
deleted file mode 100644
index 4dd5577..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-
-public class KafkaProducerExample {
-
- private static String host;
- private static int port;
- private static String topic;
-
- public static void main(String[] args) throws Exception {
-
- if (!parseParameters(args)) {
- return;
- }
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);
-
- @SuppressWarnings({ "unused", "serial" })
- DataStream<String> stream1 = env.addSource(new SourceFunction<String>() {
-
- private int index = 0;
-
- @Override
- public boolean reachedEnd() throws Exception {
- return index >= 20;
- }
-
- @Override
- public String next() throws Exception {
- if (index < 20) {
- String result = "message #" + index;
- index++;
- return result;
- }
-
- return "q";
- }
-
- }).addSink(
- new KafkaSink<String>(host + ":" + port, topic, new JavaDefaultStringSchema())
- )
- .setParallelism(3);
-
- env.execute();
- }
-
- private static boolean parseParameters(String[] args) {
- if (args.length == 3) {
- host = args[0];
- port = Integer.parseInt(args[1]);
- topic = args[2];
- return true;
- } else {
- System.err.println("Usage: KafkaProducerExample <host> <port> <topic>");
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
deleted file mode 100644
index 4286196..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-import java.io.IOException;
-
-public class Utils {
- public static class TypeInformationSerializationSchema<T>
- implements DeserializationSchema<T>, SerializationSchema<T, byte[]> {
- private final TypeSerializer<T> serializer;
- private final TypeInformation<T> ti;
-
- public TypeInformationSerializationSchema(Object type, ExecutionConfig ec) {
- this.ti = (TypeInformation<T>) TypeExtractor.getForObject(type);
- this.serializer = ti.createSerializer(ec);
- }
- @Override
- public T deserialize(byte[] message) {
- try {
- return serializer.deserialize(new ByteArrayInputView(message));
- } catch (IOException e) {
- throw new RuntimeException("Unable to deserialize message", e);
- }
- }
-
- @Override
- public boolean isEndOfStream(T nextElement) {
- return false;
- }
-
- @Override
- public byte[] serialize(T element) {
- DataOutputSerializer dos = new DataOutputSerializer(16);
- try {
- serializer.serialize(element, dos);
- } catch (IOException e) {
- throw new RuntimeException("Unable to serialize record", e);
- }
- return dos.getByteArray();
- }
-
- @Override
- public TypeInformation<T> getProducedType() {
- return ti;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
deleted file mode 100644
index 0965b29..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api;
-
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.kafka.api.config.PartitionerWrapper;
-import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.util.NetUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import kafka.serializer.DefaultEncoder;
-
-
-/**
- * Sink that emits its inputs to a Kafka topic.
- *
- * @param <IN>
- * Type of the sink input
- */
-public class KafkaSink<IN> extends RichSinkFunction<IN> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
-
- private Producer<IN, byte[]> producer;
- private Properties userDefinedProperties;
- private String topicId;
- private String brokerList;
- private SerializationSchema<IN, byte[]> schema;
- private SerializableKafkaPartitioner partitioner;
- private Class<? extends SerializableKafkaPartitioner> partitionerClass = null;
-
- /**
- * Creates a KafkaSink for a given topic. The sink produces its input to
- * the topic.
- *
- * @param brokerList
- * Addresses of the brokers
- * @param topicId
- * ID of the Kafka topic.
- * @param serializationSchema
- * User defined serialization schema.
- */
- public KafkaSink(String brokerList, String topicId,
- SerializationSchema<IN, byte[]> serializationSchema) {
- this(brokerList, topicId, new Properties(), serializationSchema);
- }
-
- /**
- * Creates a KafkaSink for a given topic with custom Producer configuration.
- * If you use this constructor, the broker should be set with the "metadata.broker.list"
- * configuration.
- *
- * @param brokerList
- * Addresses of the brokers
- * @param topicId
- * ID of the Kafka topic.
- * @param producerConfig
- * Configurations of the Kafka producer
- * @param serializationSchema
- * User defined serialization schema.
- */
- public KafkaSink(String brokerList, String topicId, Properties producerConfig,
- SerializationSchema<IN, byte[]> serializationSchema) {
- String[] elements = brokerList.split(",");
- for(String broker: elements) {
- NetUtils.ensureCorrectHostnamePort(broker);
- }
- Preconditions.checkNotNull(topicId, "TopicID not set");
-
- this.brokerList = brokerList;
- this.topicId = topicId;
- this.schema = serializationSchema;
- this.partitionerClass = null;
- this.userDefinedProperties = producerConfig;
- }
-
- /**
- * Creates a KafkaSink for a given topic. The sink produces its input to
- * the topic.
- *
- * @param brokerList
- * @param topicId
- * ID of the Kafka topic.
- * @param serializationSchema
- * User defined serialization schema.
- * @param partitioner
- * User defined partitioner.
- */
- public KafkaSink(String brokerList, String topicId,
- SerializationSchema<IN, byte[]> serializationSchema, SerializableKafkaPartitioner partitioner) {
- this(brokerList, topicId, serializationSchema);
- ClosureCleaner.ensureSerializable(partitioner);
- this.partitioner = partitioner;
- }
-
- public KafkaSink(String brokerList,
- String topicId,
- SerializationSchema<IN, byte[]> serializationSchema,
- Class<? extends SerializableKafkaPartitioner> partitioner) {
- this(brokerList, topicId, serializationSchema);
- this.partitionerClass = partitioner;
- }
-
- /**
- * Initializes the connection to Kafka.
- */
- @Override
- public void open(Configuration configuration) {
-
- Properties properties = new Properties();
-
- properties.put("metadata.broker.list", brokerList);
- properties.put("request.required.acks", "-1");
- properties.put("message.send.max.retries", "10");
-
- properties.put("serializer.class", DefaultEncoder.class.getCanonicalName());
-
- // this will not be used as the key will not be serialized
- properties.put("key.serializer.class", DefaultEncoder.class.getCanonicalName());
-
- for (Map.Entry<Object, Object> propertiesEntry : userDefinedProperties.entrySet()) {
- properties.put(propertiesEntry.getKey(), propertiesEntry.getValue());
- }
-
- if (partitioner != null) {
- properties.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
- // java serialization will do the rest.
- properties.put(PartitionerWrapper.SERIALIZED_WRAPPER_NAME, partitioner);
- }
- if (partitionerClass != null) {
- properties.put("partitioner.class", partitionerClass);
- }
-
- ProducerConfig config = new ProducerConfig(properties);
-
- try {
- producer = new Producer<IN, byte[]>(config);
- } catch (NullPointerException e) {
- throw new RuntimeException("Cannot connect to Kafka broker " + brokerList, e);
- }
- }
-
- /**
- * Called when new data arrives to the sink, and forwards it to Kafka.
- *
- * @param next
- * The incoming data
- */
- @Override
- public void invoke(IN next) {
- byte[] serialized = schema.serialize(next);
-
- // Sending message without serializable key.
- producer.send(new KeyedMessage<IN, byte[]>(topicId, null, next, serialized));
- }
-
- @Override
- public void close() {
- if (producer != null) {
- producer.close();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
deleted file mode 100644
index 00666f6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import com.google.common.base.Preconditions;
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.ConnectorSource;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Source that listens to a Kafka topic using the high level Kafka API.
- *
- * @param <OUT>
- * Type of the messages on the topic.
- */
-public class KafkaSource<OUT> extends ConnectorSource<OUT> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
-
- private final String zookeeperAddress;
- private final String groupId;
- private final String topicId;
- private Properties customProperties;
-
- private transient ConsumerConnector consumer;
- private transient ConsumerIterator<byte[], byte[]> consumerIterator;
-
- private long zookeeperSyncTimeMillis;
- private static final long ZOOKEEPER_DEFAULT_SYNC_TIME = 200;
- private static final String DEFAULT_GROUP_ID = "flink-group";
-
- // We must read this in reachedEnd() to check for the end. We keep it to return it in
- // next()
- private OUT nextElement;
-
- /**
- * Creates a KafkaSource that consumes a topic.
- *
- * @param zookeeperAddress
- * Address of the Zookeeper host (with port number).
- * @param topicId
- * ID of the Kafka topic.
- * @param groupId
- * ID of the consumer group.
- * @param deserializationSchema
- * User defined deserialization schema.
- * @param zookeeperSyncTimeMillis
- * Synchronization time with zookeeper.
- */
- public KafkaSource(String zookeeperAddress,
- String topicId, String groupId,
- DeserializationSchema<OUT> deserializationSchema,
- long zookeeperSyncTimeMillis) {
- this(zookeeperAddress, topicId, groupId, deserializationSchema, zookeeperSyncTimeMillis, null);
- }
- /**
- * Creates a KafkaSource that consumes a topic.
- *
- * @param zookeeperAddress
- * Address of the Zookeeper host (with port number).
- * @param topicId
- * ID of the Kafka topic.
- * @param groupId
- * ID of the consumer group.
- * @param deserializationSchema
- * User defined deserialization schema.
- * @param zookeeperSyncTimeMillis
- * Synchronization time with zookeeper.
- * @param customProperties
- * Custom properties for Kafka
- */
- public KafkaSource(String zookeeperAddress,
- String topicId, String groupId,
- DeserializationSchema<OUT> deserializationSchema,
- long zookeeperSyncTimeMillis, Properties customProperties) {
- super(deserializationSchema);
- Preconditions.checkNotNull(zookeeperAddress, "ZK address is null");
- Preconditions.checkNotNull(topicId, "Topic ID is null");
- Preconditions.checkNotNull(deserializationSchema, "deserializationSchema is null");
- Preconditions.checkArgument(zookeeperSyncTimeMillis >= 0, "The ZK sync time must be positive");
-
- this.zookeeperAddress = zookeeperAddress;
- this.groupId = groupId;
- this.topicId = topicId;
- this.zookeeperSyncTimeMillis = zookeeperSyncTimeMillis;
- this.customProperties = customProperties;
- }
-
- /**
- * Creates a KafkaSource that consumes a topic.
- *
- * @param zookeeperAddress
- * Address of the Zookeeper host (with port number).
- * @param topicId
- * ID of the Kafka topic.
- * @param deserializationSchema
- * User defined deserialization schema.
- * @param zookeeperSyncTimeMillis
- * Synchronization time with zookeeper.
- */
- public KafkaSource(String zookeeperAddress, String topicId, DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis) {
- this(zookeeperAddress, topicId, DEFAULT_GROUP_ID, deserializationSchema, zookeeperSyncTimeMillis, null);
- }
- /**
- * Creates a KafkaSource that consumes a topic.
- *
- * @param zookeeperAddress
- * Address of the Zookeeper host (with port number).
- * @param topicId
- * ID of the Kafka topic.
- * @param deserializationSchema
- * User defined deserialization schema.
- */
- public KafkaSource(String zookeeperAddress, String topicId, DeserializationSchema<OUT> deserializationSchema) {
- this(zookeeperAddress, topicId, deserializationSchema, ZOOKEEPER_DEFAULT_SYNC_TIME);
- }
-
- /**
- * Initializes the connection to Kafka.
- */
- private void initializeConnection() {
- Properties props = new Properties();
- props.put("zookeeper.connect", zookeeperAddress);
- props.put("group.id", groupId);
- props.put("zookeeper.session.timeout.ms", "10000");
- props.put("zookeeper.sync.time.ms", Long.toString(zookeeperSyncTimeMillis));
- props.put("auto.commit.interval.ms", "1000");
-
- if(customProperties != null) {
- for(Map.Entry<Object, Object> e : props.entrySet()) {
- if(props.contains(e.getKey())) {
- LOG.warn("Overwriting property "+e.getKey()+" with value "+e.getValue());
- }
- props.put(e.getKey(), e.getValue());
- }
- }
-
- consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
-
- Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
- .createMessageStreams(Collections.singletonMap(topicId, 1));
- List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topicId);
- KafkaStream<byte[], byte[]> stream = streams.get(0);
-
- consumer.commitOffsets();
-
- consumerIterator = stream.iterator();
- }
-
- @Override
- public void open(Configuration config) throws Exception {
- super.open(config);
- initializeConnection();
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- if (consumer != null) {
- consumer.shutdown();
- }
- }
-
- @Override
- public boolean reachedEnd() throws Exception {
- if (nextElement != null) {
- return false;
- } else if (consumerIterator.hasNext()) {
- OUT out = schema.deserialize(consumerIterator.next().message());
- if (schema.isEndOfStream(out)) {
- return true;
- }
- nextElement = out;
- }
- return false;
- }
-
- @Override
- public OUT next() throws Exception {
- if (!reachedEnd()) {
- OUT result = nextElement;
- nextElement = null;
- return result;
- } else {
- throw new RuntimeException("Source exhausted");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
deleted file mode 100644
index 7ae17df..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.api.config;
-
-import kafka.producer.Partitioner;
-import kafka.utils.VerifiableProperties;
-
-/**
- * Hacky wrapper to send an object instance through a Properties - map.
- *
- * This works as follows:
- * The recommended way of creating a KafkaSink is specifying a classname for the partitioner.
- *
- * Otherwise (if the user gave a (serializable) class instance), we give Kafka the PartitionerWrapper class of Flink.
- * This is set in the key-value (java.util.Properties) map.
- * In addition to that, we use the Properties.put(Object, Object) to store the instance of the (serializable).
- * This is a hack because the put() method is called on the underlying Hashmap.
- *
- * This PartitionerWrapper is called with the Properties. From there, we extract the wrapped Partitioner instance.
- *
- * The serializable PartitionerWrapper is serialized into the Properties Hashmap and also deserialized from there.
- */
-public class PartitionerWrapper implements Partitioner {
- public final static String SERIALIZED_WRAPPER_NAME = "flink.kafka.wrapper.serialized";
-
- private Partitioner wrapped;
- public PartitionerWrapper(VerifiableProperties properties) {
- wrapped = (Partitioner) properties.props().get(SERIALIZED_WRAPPER_NAME);
- }
-
- @Override
- public int partition(Object value, int numberOfPartitions) {
- return wrapped.partition(value, numberOfPartitions);
- }
-}