You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:34:58 UTC
[08/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
deleted file mode 100644
index 25040eb..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
+++ /dev/null
@@ -1,387 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internal;
-
-import org.apache.flink.streaming.connectors.kafka.internal.Handover.WakeupException;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Random;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-/**
- * Tests for the {@link Handover} between Kafka Consumer Thread and the fetcher's main thread.
- */
-public class HandoverTest {
-
- // ------------------------------------------------------------------------
- // test produce / consumer
- // ------------------------------------------------------------------------
-
- @Test
- public void testWithVariableProducer() throws Exception {
- runProducerConsumerTest(500, 2, 0);
- }
-
- @Test
- public void testWithVariableConsumer() throws Exception {
- runProducerConsumerTest(500, 0, 2);
- }
-
- @Test
- public void testWithVariableBoth() throws Exception {
- runProducerConsumerTest(500, 2, 2);
- }
-
- // ------------------------------------------------------------------------
- // test error propagation
- // ------------------------------------------------------------------------
-
- @Test
- public void testPublishErrorOnEmptyHandover() throws Exception {
- final Handover handover = new Handover();
-
- Exception error = new Exception();
- handover.reportError(error);
-
- try {
- handover.pollNext();
- fail("should throw an exception");
- }
- catch (Exception e) {
- assertEquals(error, e);
- }
- }
-
- @Test
- public void testPublishErrorOnFullHandover() throws Exception {
- final Handover handover = new Handover();
- handover.produce(createTestRecords());
-
- IOException error = new IOException();
- handover.reportError(error);
-
- try {
- handover.pollNext();
- fail("should throw an exception");
- }
- catch (Exception e) {
- assertEquals(error, e);
- }
- }
-
- @Test
- public void testExceptionMarksClosedOnEmpty() throws Exception {
- final Handover handover = new Handover();
-
- IllegalStateException error = new IllegalStateException();
- handover.reportError(error);
-
- try {
- handover.produce(createTestRecords());
- fail("should throw an exception");
- }
- catch (Handover.ClosedException e) {
- // expected
- }
- }
-
- @Test
- public void testExceptionMarksClosedOnFull() throws Exception {
- final Handover handover = new Handover();
- handover.produce(createTestRecords());
-
- LinkageError error = new LinkageError();
- handover.reportError(error);
-
- try {
- handover.produce(createTestRecords());
- fail("should throw an exception");
- }
- catch (Handover.ClosedException e) {
- // expected
- }
- }
-
- // ------------------------------------------------------------------------
- // test closing behavior
- // ------------------------------------------------------------------------
-
- @Test
- public void testCloseEmptyForConsumer() throws Exception {
- final Handover handover = new Handover();
- handover.close();
-
- try {
- handover.pollNext();
- fail("should throw an exception");
- }
- catch (Handover.ClosedException e) {
- // expected
- }
- }
-
- @Test
- public void testCloseFullForConsumer() throws Exception {
- final Handover handover = new Handover();
- handover.produce(createTestRecords());
- handover.close();
-
- try {
- handover.pollNext();
- fail("should throw an exception");
- }
- catch (Handover.ClosedException e) {
- // expected
- }
- }
-
- @Test
- public void testCloseEmptyForProducer() throws Exception {
- final Handover handover = new Handover();
- handover.close();
-
- try {
- handover.produce(createTestRecords());
- fail("should throw an exception");
- }
- catch (Handover.ClosedException e) {
- // expected
- }
- }
-
- @Test
- public void testCloseFullForProducer() throws Exception {
- final Handover handover = new Handover();
- handover.produce(createTestRecords());
- handover.close();
-
- try {
- handover.produce(createTestRecords());
- fail("should throw an exception");
- }
- catch (Handover.ClosedException e) {
- // expected
- }
- }
-
- // ------------------------------------------------------------------------
- // test wake up behavior
- // ------------------------------------------------------------------------
-
- @Test
- public void testWakeupDoesNotWakeWhenEmpty() throws Exception {
- Handover handover = new Handover();
- handover.wakeupProducer();
-
- // produce into a woken but empty handover
- try {
- handover.produce(createTestRecords());
- }
- catch (Handover.WakeupException e) {
- fail();
- }
-
- // handover now has records, next time we wakeup and produce it needs
- // to throw an exception
- handover.wakeupProducer();
- try {
- handover.produce(createTestRecords());
- fail("should throw an exception");
- }
- catch (Handover.WakeupException e) {
- // expected
- }
-
- // empty the handover
- assertNotNull(handover.pollNext());
-
- // producing into an empty handover should work
- try {
- handover.produce(createTestRecords());
- }
- catch (Handover.WakeupException e) {
- fail();
- }
- }
-
- @Test
- public void testWakeupWakesOnlyOnce() throws Exception {
- // create a full handover
- final Handover handover = new Handover();
- handover.produce(createTestRecords());
-
- handover.wakeupProducer();
-
- try {
- handover.produce(createTestRecords());
- fail();
- } catch (WakeupException e) {
- // expected
- }
-
- CheckedThread producer = new CheckedThread() {
- @Override
- public void go() throws Exception {
- handover.produce(createTestRecords());
- }
- };
- producer.start();
-
- // the producer must go blocking
- producer.waitUntilThreadHoldsLock(10000);
-
- // release the thread by consuming something
- assertNotNull(handover.pollNext());
- producer.sync();
- }
-
- // ------------------------------------------------------------------------
- // utilities
- // ------------------------------------------------------------------------
-
- private void runProducerConsumerTest(int numRecords, int maxProducerDelay, int maxConsumerDelay) throws Exception {
- // generate test data
- @SuppressWarnings({"unchecked", "rawtypes"})
- final ConsumerRecords<byte[], byte[]>[] data = new ConsumerRecords[numRecords];
- for (int i = 0; i < numRecords; i++) {
- data[i] = createTestRecords();
- }
-
- final Handover handover = new Handover();
-
- ProducerThread producer = new ProducerThread(handover, data, maxProducerDelay);
- ConsumerThread consumer = new ConsumerThread(handover, data, maxConsumerDelay);
-
- consumer.start();
- producer.start();
-
- // sync first on the consumer, so it propagates assertion errors
- consumer.sync();
- producer.sync();
- }
-
- @SuppressWarnings("unchecked")
- private static ConsumerRecords<byte[], byte[]> createTestRecords() {
- return mock(ConsumerRecords.class);
- }
-
- // ------------------------------------------------------------------------
-
- private static abstract class CheckedThread extends Thread {
-
- private volatile Throwable error;
-
- public abstract void go() throws Exception;
-
- @Override
- public void run() {
- try {
- go();
- }
- catch (Throwable t) {
- error = t;
- }
- }
-
- public void sync() throws Exception {
- join();
- if (error != null) {
- ExceptionUtils.rethrowException(error, error.getMessage());
- }
- }
-
- public void waitUntilThreadHoldsLock(long timeoutMillis) throws InterruptedException, TimeoutException {
- final long deadline = System.nanoTime() + timeoutMillis * 1_000_000;
-
- while (!isBlockedOrWaiting() && (System.nanoTime() < deadline)) {
- Thread.sleep(1);
- }
-
- if (!isBlockedOrWaiting()) {
- throw new TimeoutException();
- }
- }
-
- private boolean isBlockedOrWaiting() {
- State state = getState();
- return state == State.BLOCKED || state == State.WAITING || state == State.TIMED_WAITING;
- }
- }
-
- private static class ProducerThread extends CheckedThread {
-
- private final Random rnd = new Random();
- private final Handover handover;
- private final ConsumerRecords<byte[], byte[]>[] data;
- private final int maxDelay;
-
- private ProducerThread(Handover handover, ConsumerRecords<byte[], byte[]>[] data, int maxDelay) {
- this.handover = handover;
- this.data = data;
- this.maxDelay = maxDelay;
- }
-
- @Override
- public void go() throws Exception {
- for (ConsumerRecords<byte[], byte[]> rec : data) {
- handover.produce(rec);
-
- if (maxDelay > 0) {
- int delay = rnd.nextInt(maxDelay);
- Thread.sleep(delay);
- }
- }
- }
- }
-
- private static class ConsumerThread extends CheckedThread {
-
- private final Random rnd = new Random();
- private final Handover handover;
- private final ConsumerRecords<byte[], byte[]>[] data;
- private final int maxDelay;
-
- private ConsumerThread(Handover handover, ConsumerRecords<byte[], byte[]>[] data, int maxDelay) {
- this.handover = handover;
- this.data = data;
- this.maxDelay = maxDelay;
- }
-
- @Override
- public void go() throws Exception {
- for (ConsumerRecords<byte[], byte[]> rec : data) {
- ConsumerRecords<byte[], byte[]> next = handover.pollNext();
-
- assertEquals(rec, next);
-
- if (maxDelay > 0) {
- int delay = rnd.nextInt(maxDelay);
- Thread.sleep(delay);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
deleted file mode 100644
index 4ac1773..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,32 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-log4j.logger.org.apache.zookeeper=OFF, testlogger
-log4j.logger.state.change.logger=OFF, testlogger
-log4j.logger.kafka=OFF, testlogger
-
-log4j.logger.org.apache.directory=OFF, testlogger
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
deleted file mode 100644
index 45b3b92..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<configuration>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
- </encoder>
- </appender>
-
- <root level="WARN">
- <appender-ref ref="STDOUT"/>
- </root>
- <logger name="org.apache.flink.streaming" level="WARN"/>
-</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml b/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
deleted file mode 100644
index ef71bde..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/pom.xml
+++ /dev/null
@@ -1,212 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-connectors</artifactId>
- <version>1.2-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-connector-kafka-base_2.10</artifactId>
- <name>flink-connector-kafka-base</name>
-
- <packaging>jar</packaging>
-
- <!-- Allow users to pass custom connector versions -->
- <properties>
- <kafka.version>0.8.2.2</kafka.version>
- </properties>
-
- <dependencies>
-
- <!-- core dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.10</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table_2.10</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- <!-- Projects depending on this project,
- won't depend on flink-table. -->
- <optional>true</optional>
- </dependency>
-
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_${scala.binary.version}</artifactId>
- <version>${kafka.version}</version>
- <exclusions>
- <exclusion>
- <groupId>com.sun.jmx</groupId>
- <artifactId>jmxri</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jdmk</groupId>
- <artifactId>jmxtools</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
- </exclusion>
- <exclusion>
- <groupId>net.sf.jopt-simple</groupId>
- <artifactId>jopt-simple</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-reflect</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-compiler</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.yammer.metrics</groupId>
- <artifactId>metrics-annotation</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <!-- test dependencies -->
-
- <!-- force using the latest zkclient -->
- <dependency>
- <groupId>com.101tec</groupId>
- <artifactId>zkclient</artifactId>
- <version>0.7</version>
- <type>jar</type>
- <scope>test</scope>
- </dependency>
-
-
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-test</artifactId>
- <version>${curator.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-metrics-jmx</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.10</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-tests_2.10</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime_2.10</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-minikdc</artifactId>
- <version>${minikdc.version}</version>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
-
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>com.101tec</groupId>
- <artifactId>zkclient</artifactId>
- <version>0.7</version>
- </dependency>
- </dependencies>
- </dependencyManagement>
-
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <!--
- https://issues.apache.org/jira/browse/DIRSHARED-134
- Required to pull the Mini-KDC transitive dependency
- -->
- <plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-bundle-plugin</artifactId>
- <version>3.0.1</version>
- <inherited>true</inherited>
- <extensions>true</extensions>
- </plugin>
- </plugins>
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
deleted file mode 100644
index aef7116..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.commons.collections.map.LinkedMap;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.util.SerializedValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Base class of all Flink Kafka Consumer data sources.
- * This implements the common behavior across all Kafka versions.
- *
- * <p>The Kafka version specific behavior is defined mainly in the specific subclasses of the
- * {@link AbstractFetcher}.
- *
- * @param <T> The type of records produced by this data source
- */
-public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
- CheckpointListener,
- ResultTypeQueryable<T>,
- CheckpointedFunction {
- private static final long serialVersionUID = -6272159445203409112L;
-
- protected static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
-
- /** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks */
- public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
-
- /** Boolean configuration key to disable metrics tracking **/
- public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
-
- // ------------------------------------------------------------------------
- // configuration state, set on the client relevant for all subtasks
- // ------------------------------------------------------------------------
-
- private final List<String> topics;
-
- /** The schema to convert between Kafka's byte messages, and Flink's objects */
- protected final KeyedDeserializationSchema<T> deserializer;
-
- /** The set of topic partitions that the source will read */
- protected List<KafkaTopicPartition> subscribedPartitions;
-
- /** Optional timestamp extractor / watermark generator that will be run per Kafka partition,
- * to exploit per-partition timestamp characteristics.
- * The assigner is kept in serialized form, to deserialize it into multiple copies */
- private SerializedValue<AssignerWithPeriodicWatermarks<T>> periodicWatermarkAssigner;
-
- /** Optional timestamp extractor / watermark generator that will be run per Kafka partition,
- * to exploit per-partition timestamp characteristics.
- * The assigner is kept in serialized form, to deserialize it into multiple copies */
- private SerializedValue<AssignerWithPunctuatedWatermarks<T>> punctuatedWatermarkAssigner;
-
- private transient ListState<Tuple2<KafkaTopicPartition, Long>> offsetsStateForCheckpoint;
-
- // ------------------------------------------------------------------------
- // runtime state (used individually by each parallel subtask)
- // ------------------------------------------------------------------------
-
- /** Data for pending but uncommitted offsets */
- private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
-
- /** The fetcher implements the connections to the Kafka brokers */
- private transient volatile AbstractFetcher<T, ?> kafkaFetcher;
-
- /** The offsets to restore to, if the consumer restores state from a checkpoint */
- private transient volatile HashMap<KafkaTopicPartition, Long> restoreToOffset;
-
- /** Flag indicating whether the consumer is still running **/
- private volatile boolean running = true;
-
- // ------------------------------------------------------------------------
-
- /**
- * Base constructor.
- *
- * @param deserializer
- * The deserializer to turn raw byte messages into Java/Scala objects.
- */
- public FlinkKafkaConsumerBase(List<String> topics, KeyedDeserializationSchema<T> deserializer) {
- this.topics = checkNotNull(topics);
- checkArgument(topics.size() > 0, "You have to define at least one topic.");
- this.deserializer = checkNotNull(deserializer, "valueDeserializer");
- }
-
- /**
- * This method must be called from the subclasses, to set the list of all subscribed partitions
- * that this consumer will fetch from (across all subtasks).
- *
- * @param allSubscribedPartitions The list of all partitions that all subtasks together should fetch from.
- */
- protected void setSubscribedPartitions(List<KafkaTopicPartition> allSubscribedPartitions) {
- checkNotNull(allSubscribedPartitions);
- this.subscribedPartitions = Collections.unmodifiableList(allSubscribedPartitions);
- }
-
- // ------------------------------------------------------------------------
- // Configuration
- // ------------------------------------------------------------------------
-
- /**
- * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner.
- * The watermark extractor will run per Kafka partition, watermarks will be merged across partitions
- * in the same way as in the Flink runtime, when streams are merged.
- *
- * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions,
- * the streams from the partitions are unioned in a "first come first serve" fashion. Per-partition
- * characteristics are usually lost that way. For example, if the timestamps are strictly ascending
- * per Kafka partition, they will not be strictly ascending in the resulting Flink DataStream, if the
- * parallel source subtask reads more that one partition.
- *
- * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per Kafka
- * partition, allows users to let them exploit the per-partition characteristics.
- *
- * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an
- * {@link AssignerWithPeriodicWatermarks}, not both at the same time.
- *
- * @param assigner The timestamp assigner / watermark generator to use.
- * @return The consumer object, to allow function chaining.
- */
- public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) {
- checkNotNull(assigner);
-
- if (this.periodicWatermarkAssigner != null) {
- throw new IllegalStateException("A periodic watermark emitter has already been set.");
- }
- try {
- ClosureCleaner.clean(assigner, true);
- this.punctuatedWatermarkAssigner = new SerializedValue<>(assigner);
- return this;
- } catch (Exception e) {
- throw new IllegalArgumentException("The given assigner is not serializable", e);
- }
- }
-
- /**
- * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner.
- * The watermark extractor will run per Kafka partition, watermarks will be merged across partitions
- * in the same way as in the Flink runtime, when streams are merged.
- *
- * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions,
- * the streams from the partitions are unioned in a "first come first serve" fashion. Per-partition
- * characteristics are usually lost that way. For example, if the timestamps are strictly ascending
- * per Kafka partition, they will not be strictly ascending in the resulting Flink DataStream, if the
- * parallel source subtask reads more that one partition.
- *
- * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per Kafka
- * partition, allows users to let them exploit the per-partition characteristics.
- *
- * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an
- * {@link AssignerWithPeriodicWatermarks}, not both at the same time.
- *
- * @param assigner The timestamp assigner / watermark generator to use.
- * @return The consumer object, to allow function chaining.
- */
- public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
- checkNotNull(assigner);
-
- if (this.punctuatedWatermarkAssigner != null) {
- throw new IllegalStateException("A punctuated watermark emitter has already been set.");
- }
- try {
- ClosureCleaner.clean(assigner, true);
- this.periodicWatermarkAssigner = new SerializedValue<>(assigner);
- return this;
- } catch (Exception e) {
- throw new IllegalArgumentException("The given assigner is not serializable", e);
- }
- }
-
- // ------------------------------------------------------------------------
- // Work methods
- // ------------------------------------------------------------------------
-
- @Override
- public void run(SourceContext<T> sourceContext) throws Exception {
- if (subscribedPartitions == null) {
- throw new Exception("The partitions were not set for the consumer");
- }
-
- // we need only do work, if we actually have partitions assigned
- if (!subscribedPartitions.isEmpty()) {
-
- // (1) create the fetcher that will communicate with the Kafka brokers
- final AbstractFetcher<T, ?> fetcher = createFetcher(
- sourceContext, subscribedPartitions,
- periodicWatermarkAssigner, punctuatedWatermarkAssigner,
- (StreamingRuntimeContext) getRuntimeContext());
-
- // (2) set the fetcher to the restored checkpoint offsets
- if (restoreToOffset != null) {
- fetcher.restoreOffsets(restoreToOffset);
- }
-
- // publish the reference, for snapshot-, commit-, and cancel calls
- // IMPORTANT: We can only do that now, because only now will calls to
- // the fetchers 'snapshotCurrentState()' method return at least
- // the restored offsets
- this.kafkaFetcher = fetcher;
- if (!running) {
- return;
- }
-
- // (3) run the fetcher' main work method
- fetcher.runFetchLoop();
- }
- else {
- // this source never completes, so emit a Long.MAX_VALUE watermark
- // to not block watermark forwarding
- sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
-
- // wait until this is canceled
- final Object waitLock = new Object();
- while (running) {
- try {
- //noinspection SynchronizationOnLocalVariableOrMethodParameter
- synchronized (waitLock) {
- waitLock.wait();
- }
- }
- catch (InterruptedException e) {
- if (!running) {
- // restore the interrupted state, and fall through the loop
- Thread.currentThread().interrupt();
- }
- }
- }
- }
- }
-
- @Override
- public void cancel() {
- // set ourselves as not running
- running = false;
-
- // abort the fetcher, if there is one
- if (kafkaFetcher != null) {
- kafkaFetcher.cancel();
- }
-
- // there will be an interrupt() call to the main thread anyways
- }
-
- @Override
- public void open(Configuration configuration) {
- List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics);
-
- if (kafkaTopicPartitions != null) {
- assignTopicPartitions(kafkaTopicPartitions);
- }
- }
-
- @Override
- public void close() throws Exception {
- // pretty much the same logic as cancelling
- try {
- cancel();
- } finally {
- super.close();
- }
- }
-
- // ------------------------------------------------------------------------
- // Checkpoint and restore
- // ------------------------------------------------------------------------
-
- @Override
- public void initializeState(FunctionInitializationContext context) throws Exception {
-
- OperatorStateStore stateStore = context.getOperatorStateStore();
- offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
-
- if (context.isRestored()) {
- restoreToOffset = new HashMap<>();
- for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) {
- restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1);
- }
-
- LOG.info("Setting restore state in the FlinkKafkaConsumer.");
- if (LOG.isDebugEnabled()) {
- LOG.debug("Using the following offsets: {}", restoreToOffset);
- }
- } else {
- LOG.info("No restore state for FlinkKafkaConsumer.");
- }
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext context) throws Exception {
- if (!running) {
- LOG.debug("snapshotState() called on closed source");
- } else {
-
- offsetsStateForCheckpoint.clear();
-
- final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
- if (fetcher == null) {
- // the fetcher has not yet been initialized, which means we need to return the
- // originally restored offsets or the assigned partitions
-
- if (restoreToOffset != null) {
-
- for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) {
- offsetsStateForCheckpoint.add(
- Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
- }
- } else if (subscribedPartitions != null) {
- for (KafkaTopicPartition subscribedPartition : subscribedPartitions) {
- offsetsStateForCheckpoint.add(
- Tuple2.of(subscribedPartition, KafkaTopicPartitionState.OFFSET_NOT_SET));
- }
- }
-
- // the map cannot be asynchronously updated, because only one checkpoint call can happen
- // on this function at a time: either snapshotState() or notifyCheckpointComplete()
- pendingOffsetsToCommit.put(context.getCheckpointId(), restoreToOffset);
- } else {
- HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
-
- // the map cannot be asynchronously updated, because only one checkpoint call can happen
- // on this function at a time: either snapshotState() or notifyCheckpointComplete()
- pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
-
- for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
- offsetsStateForCheckpoint.add(
- Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
- }
- }
-
- // truncate the map of pending offsets to commit, to prevent infinite growth
- while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
- pendingOffsetsToCommit.remove(0);
- }
- }
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
- if (!running) {
- LOG.debug("notifyCheckpointComplete() called on closed source");
- return;
- }
-
- final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
- if (fetcher == null) {
- LOG.debug("notifyCheckpointComplete() called on uninitialized source");
- return;
- }
-
- // only one commit operation must be in progress
- if (LOG.isDebugEnabled()) {
- LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId);
- }
-
- try {
- final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
- if (posInMap == -1) {
- LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
- return;
- }
-
- @SuppressWarnings("unchecked")
- HashMap<KafkaTopicPartition, Long> offsets =
- (HashMap<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);
-
- // remove older checkpoints in map
- for (int i = 0; i < posInMap; i++) {
- pendingOffsetsToCommit.remove(0);
- }
-
- if (offsets == null || offsets.size() == 0) {
- LOG.debug("Checkpoint state was empty.");
- return;
- }
- fetcher.commitInternalOffsetsToKafka(offsets);
- }
- catch (Exception e) {
- if (running) {
- throw e;
- }
- // else ignore exception if we are no longer running
- }
- }
-
- // ------------------------------------------------------------------------
- // Kafka Consumer specific methods
- // ------------------------------------------------------------------------
-
- /**
- * Creates the fetcher that connect to the Kafka brokers, pulls data, deserialized the
- * data, and emits it into the data streams.
- *
- * @param sourceContext The source context to emit data to.
- * @param thisSubtaskPartitions The set of partitions that this subtask should handle.
- * @param watermarksPeriodic Optional, a serialized timestamp extractor / periodic watermark generator.
- * @param watermarksPunctuated Optional, a serialized timestamp extractor / punctuated watermark generator.
- * @param runtimeContext The task's runtime context.
- *
- * @return The instantiated fetcher
- *
- * @throws Exception The method should forward exceptions
- */
- protected abstract AbstractFetcher<T, ?> createFetcher(
- SourceContext<T> sourceContext,
- List<KafkaTopicPartition> thisSubtaskPartitions,
- SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
- SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
- StreamingRuntimeContext runtimeContext) throws Exception;
-
- protected abstract List<KafkaTopicPartition> getKafkaPartitions(List<String> topics);
-
- // ------------------------------------------------------------------------
- // ResultTypeQueryable methods
- // ------------------------------------------------------------------------
-
- @Override
- public TypeInformation<T> getProducedType() {
- return deserializer.getProducedType();
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- private void assignTopicPartitions(List<KafkaTopicPartition> kafkaTopicPartitions) {
- subscribedPartitions = new ArrayList<>();
-
- if (restoreToOffset != null) {
- for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
- if (restoreToOffset.containsKey(kafkaTopicPartition)) {
- subscribedPartitions.add(kafkaTopicPartition);
- }
- }
- } else {
- Collections.sort(kafkaTopicPartitions, new Comparator<KafkaTopicPartition>() {
- @Override
- public int compare(KafkaTopicPartition o1, KafkaTopicPartition o2) {
- int topicComparison = o1.getTopic().compareTo(o2.getTopic());
-
- if (topicComparison == 0) {
- return o1.getPartition() - o2.getPartition();
- } else {
- return topicComparison;
- }
- }
- });
-
- for (int i = getRuntimeContext().getIndexOfThisSubtask(); i < kafkaTopicPartitions.size(); i += getRuntimeContext().getNumberOfParallelSubtasks()) {
- subscribedPartitions.add(kafkaTopicPartitions.get(i));
- }
- }
- }
-
- /**
- * Selects which of the given partitions should be handled by a specific consumer,
- * given a certain number of consumers.
- *
- * @param allPartitions The partitions to select from
- * @param numConsumers The number of consumers
- * @param consumerIndex The index of the specific consumer
- *
- * @return The sublist of partitions to be handled by that consumer.
- */
- protected static List<KafkaTopicPartition> assignPartitions(
- List<KafkaTopicPartition> allPartitions,
- int numConsumers, int consumerIndex) {
- final List<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<>(
- allPartitions.size() / numConsumers + 1);
-
- for (int i = 0; i < allPartitions.size(); i++) {
- if (i % numConsumers == consumerIndex) {
- thisSubtaskPartitions.add(allPartitions.get(i));
- }
- }
-
- return thisSubtaskPartitions;
- }
-
- /**
- * Logs the partition information in INFO level.
- *
- * @param logger The logger to log to.
- * @param partitionInfos List of subscribed partitions
- */
- protected static void logPartitionInfo(Logger logger, List<KafkaTopicPartition> partitionInfos) {
- Map<String, Integer> countPerTopic = new HashMap<>();
- for (KafkaTopicPartition partition : partitionInfos) {
- Integer count = countPerTopic.get(partition.getTopic());
- if (count == null) {
- count = 1;
- } else {
- count++;
- }
- countPerTopic.put(partition.getTopic(), count);
- }
- StringBuilder sb = new StringBuilder(
- "Consumer is going to read the following topics (with number of partitions): ");
-
- for (Map.Entry<String, Integer> e : countPerTopic.entrySet()) {
- sb.append(e.getKey()).append(" (").append(e.getValue()).append("), ");
- }
-
- logger.info(sb.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
deleted file mode 100644
index d413f1c..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ /dev/null
@@ -1,386 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.runtime.util.SerializableObject;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.util.NetUtils;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Collections;
-import java.util.Comparator;
-
-import static java.util.Objects.requireNonNull;
-
-
-/**
- * Flink Sink to produce data into a Kafka topic.
- *
- * Please note that this producer provides at-least-once reliability guarantees when
- * checkpoints are enabled and setFlushOnCheckpoint(true) is set.
- * Otherwise, the producer doesn't provide any reliability guarantees.
- *
- * @param <IN> Type of the messages to write into Kafka.
- */
-public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
-
- private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
-
- private static final long serialVersionUID = 1L;
-
- /**
- * Configuration key for disabling the metrics reporting
- */
- public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
-
- /**
- * Array with the partition ids of the given defaultTopicId
- * The size of this array is the number of partitions
- */
- protected int[] partitions;
-
- /**
- * User defined properties for the Producer
- */
- protected final Properties producerConfig;
-
- /**
- * The name of the default topic this producer is writing data to
- */
- protected final String defaultTopicId;
-
- /**
- * (Serializable) SerializationSchema for turning objects used with Flink into
- * byte[] for Kafka.
- */
- protected final KeyedSerializationSchema<IN> schema;
-
- /**
- * User-provided partitioner for assigning an object to a Kafka partition.
- */
- protected final KafkaPartitioner<IN> partitioner;
-
- /**
- * Flag indicating whether to accept failures (and log them), or to fail on failures
- */
- protected boolean logFailuresOnly;
-
- /**
- * If true, the producer will wait until all outstanding records have been send to the broker.
- */
- protected boolean flushOnCheckpoint;
-
- // -------------------------------- Runtime fields ------------------------------------------
-
- /** KafkaProducer instance */
- protected transient KafkaProducer<byte[], byte[]> producer;
-
- /** The callback than handles error propagation or logging callbacks */
- protected transient Callback callback;
-
- /** Errors encountered in the async producer are stored here */
- protected transient volatile Exception asyncException;
-
- /** Lock for accessing the pending records */
- protected final SerializableObject pendingRecordsLock = new SerializableObject();
-
- /** Number of unacknowledged records. */
- protected long pendingRecords;
-
- protected OperatorStateStore stateStore;
-
-
- /**
- * The main constructor for creating a FlinkKafkaProducer.
- *
- * @param defaultTopicId The default topic to write data to
- * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
- * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
- * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner
- */
- public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
- requireNonNull(defaultTopicId, "TopicID not set");
- requireNonNull(serializationSchema, "serializationSchema not set");
- requireNonNull(producerConfig, "producerConfig not set");
- ClosureCleaner.clean(customPartitioner, true);
- ClosureCleaner.ensureSerializable(serializationSchema);
-
- this.defaultTopicId = defaultTopicId;
- this.schema = serializationSchema;
- this.producerConfig = producerConfig;
-
- // set the producer configuration properties for kafka record key value serializers.
- if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
- this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
- } else {
- LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
- }
-
- if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
- this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
- } else {
- LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
- }
-
- // eagerly ensure that bootstrap servers are set.
- if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
- throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties.");
- }
-
- this.partitioner = customPartitioner;
- }
-
- // ---------------------------------- Properties --------------------------
-
- /**
- * Defines whether the producer should fail on errors, or only log them.
- * If this is set to true, then exceptions will be only logged, if set to false,
- * exceptions will be eventually thrown and cause the streaming program to
- * fail (and enter recovery).
- *
- * @param logFailuresOnly The flag to indicate logging-only on exceptions.
- */
- public void setLogFailuresOnly(boolean logFailuresOnly) {
- this.logFailuresOnly = logFailuresOnly;
- }
-
- /**
- * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers
- * to be acknowledged by the Kafka producer on a checkpoint.
- * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint.
- *
- * @param flush Flag indicating the flushing mode (true = flush on checkpoint)
- */
- public void setFlushOnCheckpoint(boolean flush) {
- this.flushOnCheckpoint = flush;
- }
-
- /**
- * Used for testing only
- */
- protected <K,V> KafkaProducer<K,V> getKafkaProducer(Properties props) {
- return new KafkaProducer<>(props);
- }
-
- // ----------------------------------- Utilities --------------------------
-
- /**
- * Initializes the connection to Kafka.
- */
- @Override
- public void open(Configuration configuration) {
- producer = getKafkaProducer(this.producerConfig);
-
- // the fetched list is immutable, so we're creating a mutable copy in order to sort it
- List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(defaultTopicId));
-
- // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
- Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
- @Override
- public int compare(PartitionInfo o1, PartitionInfo o2) {
- return Integer.compare(o1.partition(), o2.partition());
- }
- });
-
- partitions = new int[partitionsList.size()];
- for (int i = 0; i < partitions.length; i++) {
- partitions[i] = partitionsList.get(i).partition();
- }
-
- RuntimeContext ctx = getRuntimeContext();
- if (partitioner != null) {
- partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions);
- }
-
- LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}",
- ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), defaultTopicId);
-
- // register Kafka metrics to Flink accumulators
- if (!Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
- Map<MetricName, ? extends Metric> metrics = this.producer.metrics();
-
- if (metrics == null) {
- // MapR's Kafka implementation returns null here.
- LOG.info("Producer implementation does not support metrics");
- } else {
- final MetricGroup kafkaMetricGroup = getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
- for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
- kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
- }
- }
- }
-
- if (flushOnCheckpoint && !((StreamingRuntimeContext)this.getRuntimeContext()).isCheckpointingEnabled()) {
- LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
- flushOnCheckpoint = false;
- }
-
- if (logFailuresOnly) {
- callback = new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception e) {
- if (e != null) {
- LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
- }
- acknowledgeMessage();
- }
- };
- }
- else {
- callback = new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception != null && asyncException == null) {
- asyncException = exception;
- }
- acknowledgeMessage();
- }
- };
- }
- }
-
- /**
- * Called when new data arrives to the sink, and forwards it to Kafka.
- *
- * @param next
- * The incoming data
- */
- @Override
- public void invoke(IN next) throws Exception {
- // propagate asynchronous errors
- checkErroneous();
-
- byte[] serializedKey = schema.serializeKey(next);
- byte[] serializedValue = schema.serializeValue(next);
- String targetTopic = schema.getTargetTopic(next);
- if (targetTopic == null) {
- targetTopic = defaultTopicId;
- }
-
- ProducerRecord<byte[], byte[]> record;
- if (partitioner == null) {
- record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue);
- } else {
- record = new ProducerRecord<>(targetTopic, partitioner.partition(next, serializedKey, serializedValue, partitions.length), serializedKey, serializedValue);
- }
- if (flushOnCheckpoint) {
- synchronized (pendingRecordsLock) {
- pendingRecords++;
- }
- }
- producer.send(record, callback);
- }
-
-
- @Override
- public void close() throws Exception {
- if (producer != null) {
- producer.close();
- }
-
- // make sure we propagate pending errors
- checkErroneous();
- }
-
- // ------------------- Logic for handling checkpoint flushing -------------------------- //
-
- private void acknowledgeMessage() {
- if (flushOnCheckpoint) {
- synchronized (pendingRecordsLock) {
- pendingRecords--;
- if (pendingRecords == 0) {
- pendingRecordsLock.notifyAll();
- }
- }
- }
- }
-
- /**
- * Flush pending records.
- */
- protected abstract void flush();
-
- @Override
- public void initializeState(FunctionInitializationContext context) throws Exception {
- this.stateStore = context.getOperatorStateStore();
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
- if (flushOnCheckpoint) {
- // flushing is activated: We need to wait until pendingRecords is 0
- flush();
- synchronized (pendingRecordsLock) {
- if (pendingRecords != 0) {
- throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecords);
- }
- // pending records count is 0. We can now confirm the checkpoint
- }
- }
- }
-
- // ----------------------------------- Utilities --------------------------
-
- protected void checkErroneous() throws Exception {
- Exception e = asyncException;
- if (e != null) {
- // prevent double throwing
- asyncException = null;
- throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
- }
- }
-
- public static Properties getPropertiesFromBrokerList(String brokerList) {
- String[] elements = brokerList.split(",");
-
- // validate the broker addresses
- for (String broker: elements) {
- NetUtils.getCorrectHostnamePort(broker);
- }
-
- Properties props = new Properties();
- props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
- return props;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
deleted file mode 100644
index ee98783..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-import java.util.Properties;
-
-/**
- * Base class for {@link KafkaTableSink} that serializes data in JSON format
- */
-public abstract class KafkaJsonTableSink extends KafkaTableSink {
-
- /**
- * Creates KafkaJsonTableSink
- *
- * @param topic topic in Kafka to which table is written
- * @param properties properties to connect to Kafka
- * @param partitioner Kafka partitioner
- */
- public KafkaJsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
- super(topic, properties, partitioner);
- }
-
- @Override
- protected SerializationSchema<Row> createSerializationSchema(String[] fieldNames) {
- return new JsonRowSerializationSchema(fieldNames);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
deleted file mode 100644
index f145509..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.sources.StreamTableSource;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
-
-import java.util.Properties;
-
-/**
- * A version-agnostic Kafka JSON {@link StreamTableSource}.
- *
- * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
- *
- * <p>The field names are used to parse the JSON file and so are the types.
- */
-public abstract class KafkaJsonTableSource extends KafkaTableSource {
-
- /**
- * Creates a generic Kafka JSON {@link StreamTableSource}.
- *
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param fieldNames Row field names.
- * @param fieldTypes Row field types.
- */
- KafkaJsonTableSource(
- String topic,
- Properties properties,
- String[] fieldNames,
- Class<?>[] fieldTypes) {
-
- super(topic, properties, createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes);
- }
-
- /**
- * Creates a generic Kafka JSON {@link StreamTableSource}.
- *
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param fieldNames Row field names.
- * @param fieldTypes Row field types.
- */
- KafkaJsonTableSource(
- String topic,
- Properties properties,
- String[] fieldNames,
- TypeInformation<?>[] fieldTypes) {
-
- super(topic, properties, createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes);
- }
-
- /**
- * Configures the failure behaviour if a JSON field is missing.
- *
- * <p>By default, a missing field is ignored and the field is set to null.
- *
- * @param failOnMissingField Flag indicating whether to fail or not on a missing field.
- */
- public void setFailOnMissingField(boolean failOnMissingField) {
- JsonRowDeserializationSchema deserializationSchema = (JsonRowDeserializationSchema) getDeserializationSchema();
- deserializationSchema.setFailOnMissingField(failOnMissingField);
- }
-
- private static JsonRowDeserializationSchema createDeserializationSchema(
- String[] fieldNames,
- TypeInformation<?>[] fieldTypes) {
-
- return new JsonRowDeserializationSchema(fieldNames, fieldTypes);
- }
-
- private static JsonRowDeserializationSchema createDeserializationSchema(
- String[] fieldNames,
- Class<?>[] fieldTypes) {
-
- return new JsonRowDeserializationSchema(fieldNames, fieldTypes);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
deleted file mode 100644
index 714d9cd..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.sinks.StreamTableSink;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Properties;
-
-/**
- * A version-agnostic Kafka {@link StreamTableSink}.
- *
- * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #createKafkaProducer(String, Properties, SerializationSchema, KafkaPartitioner)}}.
- */
-public abstract class KafkaTableSink implements StreamTableSink<Row> {
-
- protected final String topic;
- protected final Properties properties;
- protected SerializationSchema<Row> serializationSchema;
- protected final KafkaPartitioner<Row> partitioner;
- protected String[] fieldNames;
- protected TypeInformation[] fieldTypes;
-
- /**
- * Creates KafkaTableSink
- *
- * @param topic Kafka topic to write to.
- * @param properties Properties for the Kafka consumer.
- * @param partitioner Partitioner to select Kafka partition for each item
- */
- public KafkaTableSink(
- String topic,
- Properties properties,
- KafkaPartitioner<Row> partitioner) {
-
- this.topic = Preconditions.checkNotNull(topic, "topic");
- this.properties = Preconditions.checkNotNull(properties, "properties");
- this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner");
- }
-
- /**
- * Returns the version-specifid Kafka producer.
- *
- * @param topic Kafka topic to produce to.
- * @param properties Properties for the Kafka producer.
- * @param serializationSchema Serialization schema to use to create Kafka records.
- * @param partitioner Partitioner to select Kafka partition.
- * @return The version-specific Kafka producer
- */
- protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
- String topic, Properties properties,
- SerializationSchema<Row> serializationSchema,
- KafkaPartitioner<Row> partitioner);
-
- /**
- * Create serialization schema for converting table rows into bytes.
- *
- * @param fieldNames Field names in table rows.
- * @return Instance of serialization schema
- */
- protected abstract SerializationSchema<Row> createSerializationSchema(String[] fieldNames);
-
- /**
- * Create a deep copy of this sink.
- *
- * @return Deep copy of this sink
- */
- protected abstract KafkaTableSink createCopy();
-
- @Override
- public void emitDataStream(DataStream<Row> dataStream) {
- FlinkKafkaProducerBase<Row> kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner);
- dataStream.addSink(kafkaProducer);
- }
-
- @Override
- public TypeInformation<Row> getOutputType() {
- return new RowTypeInfo(getFieldTypes());
- }
-
- public String[] getFieldNames() {
- return fieldNames;
- }
-
- @Override
- public TypeInformation<?>[] getFieldTypes() {
- return fieldTypes;
- }
-
- @Override
- public KafkaTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
- KafkaTableSink copy = createCopy();
- copy.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames");
- copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes");
- Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
- "Number of provided field names and types does not match.");
- copy.serializationSchema = createSerializationSchema(fieldNames);
-
- return copy;
- }
-
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
deleted file mode 100644
index fd423d7..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.sources.StreamTableSource;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Properties;
-
-import static org.apache.flink.streaming.connectors.kafka.internals.TypeUtil.toTypeInfo;
-
-/**
- * A version-agnostic Kafka {@link StreamTableSource}.
- *
- * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
- */
-public abstract class KafkaTableSource implements StreamTableSource<Row> {
-
- /** The Kafka topic to consume. */
- private final String topic;
-
- /** Properties for the Kafka consumer. */
- private final Properties properties;
-
- /** Deserialization schema to use for Kafka records. */
- private final DeserializationSchema<Row> deserializationSchema;
-
- /** Row field names. */
- private final String[] fieldNames;
-
- /** Row field types. */
- private final TypeInformation<?>[] fieldTypes;
-
- /**
- * Creates a generic Kafka {@link StreamTableSource}.
- *
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param deserializationSchema Deserialization schema to use for Kafka records.
- * @param fieldNames Row field names.
- * @param fieldTypes Row field types.
- */
- KafkaTableSource(
- String topic,
- Properties properties,
- DeserializationSchema<Row> deserializationSchema,
- String[] fieldNames,
- Class<?>[] fieldTypes) {
-
- this(topic, properties, deserializationSchema, fieldNames, toTypeInfo(fieldTypes));
- }
-
- /**
- * Creates a generic Kafka {@link StreamTableSource}.
- *
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param deserializationSchema Deserialization schema to use for Kafka records.
- * @param fieldNames Row field names.
- * @param fieldTypes Row field types.
- */
- KafkaTableSource(
- String topic,
- Properties properties,
- DeserializationSchema<Row> deserializationSchema,
- String[] fieldNames,
- TypeInformation<?>[] fieldTypes) {
-
- this.topic = Preconditions.checkNotNull(topic, "Topic");
- this.properties = Preconditions.checkNotNull(properties, "Properties");
- this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
- this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
- this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types");
-
- Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
- "Number of provided field names and types does not match.");
- }
-
- /**
- * NOTE: This method is for internal use only for defining a TableSource.
- * Do not use it in Table API programs.
- */
- @Override
- public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
- // Version-specific Kafka consumer
- FlinkKafkaConsumerBase<Row> kafkaConsumer = getKafkaConsumer(topic, properties, deserializationSchema);
- DataStream<Row> kafkaSource = env.addSource(kafkaConsumer);
- return kafkaSource;
- }
-
- @Override
- public int getNumberOfFields() {
- return fieldNames.length;
- }
-
- @Override
- public String[] getFieldsNames() {
- return fieldNames;
- }
-
- @Override
- public TypeInformation<?>[] getFieldTypes() {
- return fieldTypes;
- }
-
- @Override
- public TypeInformation<Row> getReturnType() {
- return new RowTypeInfo(fieldTypes);
- }
-
- /**
- * Returns the version-specific Kafka consumer.
- *
- * @param topic Kafka topic to consume.
- * @param properties Properties for the Kafka consumer.
- * @param deserializationSchema Deserialization schema to use for Kafka records.
- * @return The version-specific Kafka consumer
- */
- abstract FlinkKafkaConsumerBase<Row> getKafkaConsumer(
- String topic,
- Properties properties,
- DeserializationSchema<Row> deserializationSchema);
-
- /**
- * Returns the deserialization schema.
- *
- * @return The deserialization schema
- */
- protected DeserializationSchema<Row> getDeserializationSchema() {
- return deserializationSchema;
- }
-}