You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by rangadi <gi...@git.apache.org> on 2017/08/18 20:02:26 UTC

[GitHub] flink pull request #4239: [FLINK-6988] flink-connector-kafka-0.11 with exact...

Github user rangadi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4239#discussion_r134044031
  
    --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java ---
    @@ -0,0 +1,1000 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import org.apache.flink.api.common.functions.RuntimeContext;
    +import org.apache.flink.api.common.state.ListState;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.TypeHint;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.ClosureCleaner;
    +import org.apache.flink.api.java.typeutils.GenericTypeInfo;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.metrics.MetricGroup;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.runtime.util.SerializableObject;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.DataStreamSink;
    +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    +import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
    +import org.apache.flink.streaming.api.operators.StreamSink;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
    +import org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
    +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.NetUtils;
    +
    +import org.apache.kafka.clients.producer.Callback;
    +import org.apache.kafka.clients.producer.Producer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.kafka.clients.producer.RecordMetadata;
    +import org.apache.kafka.common.Metric;
    +import org.apache.kafka.common.MetricName;
    +import org.apache.kafka.common.PartitionInfo;
    +import org.apache.kafka.common.errors.InvalidTxnStateException;
    +import org.apache.kafka.common.serialization.ByteArraySerializer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.concurrent.BlockingDeque;
    +import java.util.concurrent.LinkedBlockingDeque;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +/**
    + * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.11.x. By default producer
    + * will use {@link Semantic#EXACTLY_ONCE} semantic.
    + *
    + * <p>Implementation note: This producer is a hybrid between a regular regular
    + * {@link org.apache.flink.streaming.api.functions.sink.SinkFunction} (a) and a custom operator (b).
    + *
    + * <p>Details about approach (a):
    + *  Because of regular {@link org.apache.flink.streaming.api.functions.sink.SinkFunction} APIs limitations, this
    + *  variant do not allow accessing the timestamp attached to the record.
    + *
    + * <p>Details about approach (b):
    + *  Kafka 0.11 supports writing the timestamp attached to a record to Kafka. When using the
    + *  {@link FlinkKafkaProducer011#writeToKafkaWithTimestamps} method, the Kafka producer can access the internal
    + *  record timestamp of the record and write it to Kafka.
    + *
    + * <p>All methods and constructors in this class are marked with the approach they are needed for.
    + */
    +public class FlinkKafkaProducer011<IN>
    +		extends TwoPhaseCommitSinkFunction<IN, FlinkKafkaProducer011.KafkaTransactionState> {
    +
    +	/**
    +	 *  Semantics that can be chosen.
    +	 *  <li>{@link #EXACTLY_ONCE}</li>
    +	 *  <li>{@link #AT_LEAST_ONCE}</li>
    +	 *  <li>{@link #NONE}</li>
    +	 */
    +	public enum Semantic {
    +		/**
    +		 * Semantic.EXACTLY_ONCE the Flink producer will write all messages in a Kafka transaction that will be
    +		 * committed to the Kafka on a checkpoint.
    +		 *
    +		 * <p>In this mode {@link FlinkKafkaProducer011} sets up a pool of {@link FlinkKafkaProducer}. Between each
    +		 * checkpoint there is created new Kafka transaction, which is being committed on
    +		 * {@link FlinkKafkaProducer011#notifyCheckpointComplete(long)}. If checkpoint complete notifications are
    +		 * running late, {@link FlinkKafkaProducer011} can run out of {@link FlinkKafkaProducer}s in the pool. In that
    +		 * case any subsequent {@link FlinkKafkaProducer011#snapshotState(FunctionSnapshotContext)} requests will fail
    +		 * and {@link FlinkKafkaProducer011} will keep using the {@link FlinkKafkaProducer} from previous checkpoint.
    +		 * To decrease chances of failing checkpoints there are three options:
    +		 * <li>decrease number of max concurrent checkpoints</li>
    +		 * <li>make checkpoints more reliable (so that they complete faster)</li>
    +		 * <li>increase delay between checkpoints</li>
    +		 * <li>increase size of {@link FlinkKafkaProducer}s pool</li>
    +		 */
    +		EXACTLY_ONCE,
    +		/**
    +		 * Semantic.AT_LEAST_ONCE the Flink producer will wait for all outstanding messages in the Kafka buffers
    +		 * to be acknowledged by the Kafka producer on a checkpoint.
    +		 */
    +		AT_LEAST_ONCE,
    +		/**
    +		 * Semantic.NONE means that nothing will be guaranteed. Messages can be lost and/or duplicated in case
    +		 * of failure.
    +		 */
    +		NONE
    +	}
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	/**
    +	 * Default number of KafkaProducers in the pool. See {@link Semantic#EXACTLY_ONCE}.
    +	 */
    +	public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5;
    +
    +	/**
    +	 * Configuration key for disabling the metrics reporting.
    +	 */
    +	public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
    +
    +	/**
    +	 * Descriptor of the transacionalIds list.
    +	 */
    +	private static final ListStateDescriptor<String> TRANSACTIONAL_IDS_DESCRIPTOR =
    +		new ListStateDescriptor<>("transactional-ids", TypeInformation.of(String.class));
    +
    +	/**
    +	 * Pool of transacional ids backed up in state.
    +	 */
    +	private ListState<String> transactionalIdsState;
    +
    +	/**
    +	 * Already used transactional ids.
    +	 */
    +	private final Set<String> usedTransactionalIds = new HashSet<>();
    +
    +	/**
    +	 * Available to use transactional ids.
    +	 */
    +	private final BlockingDeque<String> availableTransactionalIds = new LinkedBlockingDeque<>();
    +
    +	/**
    +	 * User defined properties for the Producer.
    +	 */
    +	private final Properties producerConfig;
    +
    +	/**
    +	 * The name of the default topic this producer is writing data to.
    +	 */
    +	private final String defaultTopicId;
    +
    +	/**
    +	 * (Serializable) SerializationSchema for turning objects used with Flink into.
    +	 * byte[] for Kafka.
    +	 */
    +	private final KeyedSerializationSchema<IN> schema;
    +
    +	/**
    +	 * User-provided partitioner for assigning an object to a Kafka partition for each topic.
    +	 */
    +	private final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
    +
    +	/**
    +	 * Partitions of each topic.
    +	 */
    +	private final Map<String, int[]> topicPartitionsMap;
    +
    +	/**
    +	 * Max number of producers in the pool. If all producers are in use, snapshoting state will throw an exception.
    +	 */
    +	private final int kafkaProducersPoolSize;
    +
    +	/**
    +	 * Flag controlling whether we are writing the Flink record's timestamp into Kafka.
    +	 */
    +	private boolean writeTimestampToKafka = false;
    +
    +	/**
    +	 * Flag indicating whether to accept failures (and log them), or to fail on failures.
    +	 */
    +	private boolean logFailuresOnly;
    +
    +	/**
    +	 * Semantic chosen for this instance.
    +	 */
    +	private Semantic semantic;
    +
    +	/**
    +	 * Pool of KafkaProducers objects.
    +	 */
    +	private transient ProducersPool producersPool = new ProducersPool();
    +
    +	// -------------------------------- Runtime fields ------------------------------------------
    +
    +	/** The callback than handles error propagation or logging callbacks. */
    +	@Nullable
    +	private transient Callback callback;
    +
    +	/** Errors encountered in the async producer are stored here. */
    +	@Nullable
    +	private transient volatile Exception asyncException;
    +
    +	/** Lock for accessing the pending records. */
    +	private final SerializableObject pendingRecordsLock = new SerializableObject();
    +
    +	/** Number of unacknowledged records. */
    +	private final AtomicLong pendingRecords = new AtomicLong();
    +
    +	/** Cache of metrics to replace already registered metrics instead of overwriting existing ones. */
    +	private final Map<String, KafkaMetricMuttableWrapper> previouslyCreatedMetrics = new HashMap<>();
    +
    +	// ---------------------- "Constructors" for timestamp writing ------------------
    +
    +	/**
    +	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
    +	 * the topic.
    +	 *
    +	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
    +	 *
    +	 * @param inStream The stream to write to Kafka
    +	 * @param topicId ID of the Kafka topic.
    +	 * @param serializationSchema User defined serialization schema supporting key/value messages
    +	 * @param producerConfig Properties with the producer configuration.
    +	 */
    +	public static <IN> FlinkKafkaProducer011Configuration<IN> writeToKafkaWithTimestamps(DataStream<IN> inStream,
    +																					String topicId,
    +																					KeyedSerializationSchema<IN> serializationSchema,
    +																					Properties producerConfig) {
    +		return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<IN>());
    +	}
    +
    +	/**
    +	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
    +	 * the topic.
    +	 *
    +	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
    +	 *
    +	 * @param inStream The stream to write to Kafka
    +	 * @param topicId ID of the Kafka topic.
    +	 * @param serializationSchema User defined (keyless) serialization schema.
    +	 * @param producerConfig Properties with the producer configuration.
    +	 */
    +	public static <IN> FlinkKafkaProducer011Configuration<IN> writeToKafkaWithTimestamps(DataStream<IN> inStream,
    +																					String topicId,
    +																					SerializationSchema<IN> serializationSchema,
    +																					Properties producerConfig) {
    +		return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<IN>());
    +	}
    +
    +	/**
    +	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
    +	 * the topic.
    +	 *
    +	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
    +	 *
    +	 *  @param inStream The stream to write to Kafka
    +	 *  @param topicId The name of the target topic
    +	 *  @param serializationSchema A serializable serialization schema for turning user objects into a
    +	 *                             kafka-consumable byte[] supporting key/value messages
    +	 *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only
    +	 *                        required argument.
    +	 *  @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
    +	 */
    +	public static <IN> FlinkKafkaProducer011Configuration<IN> writeToKafkaWithTimestamps(DataStream<IN> inStream,
    +																					String topicId,
    +																					KeyedSerializationSchema<IN> serializationSchema,
    +																					Properties producerConfig,
    +																					FlinkKafkaPartitioner<IN> customPartitioner) {
    +		return writeToKafkaWithTimestamps(
    +			inStream,
    +			topicId,
    +			serializationSchema,
    +			producerConfig,
    +			customPartitioner,
    +			Semantic.EXACTLY_ONCE,
    +			DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
    +	}
    +
    +	/**
    +	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
    +	 * the topic.
    +	 *
    +	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
    +	 *
    +	 *  @param inStream The stream to write to Kafka
    +	 *  @param topicId The name of the target topic
    +	 *  @param serializationSchema A serializable serialization schema for turning user objects into a
    +	 *                             kafka-consumable byte[] supporting key/value messages
    +	 *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only
    +	 *                        required argument.
    +	 *  @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
    +	 *  @param semantic Defines semantic that will be used by this producer (see {@link Semantic}).
    +	 *  @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link Semantic#EXACTLY_ONCE}).
    +	 */
    +	public static <IN> FlinkKafkaProducer011Configuration<IN> writeToKafkaWithTimestamps(DataStream<IN> inStream,
    +																					String topicId,
    +																					KeyedSerializationSchema<IN> serializationSchema,
    +																					Properties producerConfig,
    +																					FlinkKafkaPartitioner<IN> customPartitioner,
    +																					Semantic semantic,
    +																					int kafkaProducersPoolSize) {
    +
    +		GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
    +		FlinkKafkaProducer011<IN> kafkaProducer =
    +			new FlinkKafkaProducer011<>(
    +				topicId,
    +				serializationSchema,
    +				producerConfig,
    +				customPartitioner,
    +				semantic,
    +				kafkaProducersPoolSize);
    +		KafkaStreamSink streamSink = new KafkaStreamSink(kafkaProducer);
    +		SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.11.x", objectTypeInfo, streamSink);
    +		return new FlinkKafkaProducer011Configuration<>(transformation, streamSink);
    +	}
    +
    +	// ---------------------- Regular constructors w/o timestamp support  ------------------
    +
    +	/**
    +	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
    +	 * the topic.
    +	 *
    +	 * @param brokerList
    +	 *			Comma separated addresses of the brokers
    +	 * @param topicId
    +	 * 			ID of the Kafka topic.
    +	 * @param serializationSchema
    +	 * 			User defined (keyless) serialization schema.
    +	 */
    +	public FlinkKafkaProducer011(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
    +		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
    +	}
    +
    +	/**
    +	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
    +	 * the topic.
    +	 *
    +	 * @param topicId
    +	 * 			ID of the Kafka topic.
    +	 * @param serializationSchema
    +	 * 			User defined (keyless) serialization schema.
    +	 * @param producerConfig
    +	 * 			Properties with the producer configuration.
    +	 */
    +	public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
    +		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<IN>());
    +	}
    +
    +	/**
    +	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
    +	 * the topic.
    +	 *
    +	 * @param topicId The topic to write data to
    +	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
    +	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
    +	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
    +	 */
    +	public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
    +		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
    +	}
    +
    +	// ------------------- Key/Value serialization schema constructors ----------------------
    +
    +	/**
    +	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
    +	 * the topic.
    +	 *
    +	 * @param brokerList
    +	 *			Comma separated addresses of the brokers
    +	 * @param topicId
    +	 * 			ID of the Kafka topic.
    +	 * @param serializationSchema
    +	 * 			User defined serialization schema supporting key/value messages
    +	 */
    +	public FlinkKafkaProducer011(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
    +		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
    +	}
    +
    +	/**
    +	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
    +	 * the topic.
    +	 *
    +	 * @param topicId
    +	 * 			ID of the Kafka topic.
    +	 * @param serializationSchema
    +	 * 			User defined serialization schema supporting key/value messages
    +	 * @param producerConfig
    +	 * 			Properties with the producer configuration.
    +	 */
    +	public FlinkKafkaProducer011(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
    +		this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<IN>());
    +	}
    +
    +	/**
    +	 * The main constructor for creating a FlinkKafkaProducer.
    +	 *
    +	 * <p>This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
    +	 *
    +	 * @param defaultTopicId The default topic to write data to
    +	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
    +	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
    +	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner.
    +	 */
    +	public FlinkKafkaProducer011(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
    +		this(
    +			defaultTopicId,
    +			serializationSchema,
    +			producerConfig,
    +			customPartitioner,
    +			Semantic.EXACTLY_ONCE,
    +			DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
    +	}
    +
    +	/**
    +	 * The main constructor for creating a FlinkKafkaProducer.
    +	 *
    +	 * <p>This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
    +	 *
    +	 * @param defaultTopicId The default topic to write data to
    +	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
    +	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
    +	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner.
    +	 * @param semantic Defines semantic that will be used by this producer (see {@link Semantic}).
    +	 * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link Semantic#EXACTLY_ONCE}).
    +	 */
    +	public FlinkKafkaProducer011(
    +			String defaultTopicId,
    +			KeyedSerializationSchema<IN> serializationSchema,
    +			Properties producerConfig,
    +			FlinkKafkaPartitioner<IN> customPartitioner,
    +			Semantic semantic,
    +			int kafkaProducersPoolSize) {
    +		super(
    +			TypeInformation.of(KafkaTransactionState.class),
    +			TypeInformation.of(new TypeHint<List<KafkaTransactionState>>() {}));
    +
    +		requireNonNull(defaultTopicId, "TopicID not set");
    +		requireNonNull(serializationSchema, "serializationSchema not set");
    +		requireNonNull(producerConfig, "producerConfig not set");
    +		ClosureCleaner.clean(customPartitioner, true);
    +		ClosureCleaner.ensureSerializable(serializationSchema);
    +
    +		this.defaultTopicId = defaultTopicId;
    +		this.schema = serializationSchema;
    +		this.producerConfig = producerConfig;
    +		this.flinkKafkaPartitioner = customPartitioner;
    +		this.semantic = semantic;
    +		this.kafkaProducersPoolSize = kafkaProducersPoolSize;
    +
    +		// set the producer configuration properties for kafka record key value serializers.
    +		if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
    +			this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
    +		} else {
    +			LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
    +		}
    +
    +		if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
    +			this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
    +		} else {
    +			LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
    +		}
    +
    +		// eagerly ensure that bootstrap servers are set.
    +		if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
    +			throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties.");
    +		}
    +
    +		this.topicPartitionsMap = new HashMap<>();
    +	}
    +
    +	// ---------------------------------- Properties --------------------------
    +
    +	/**
    +	 * Defines whether the producer should fail on errors, or only log them.
    +	 * If this is set to true, then exceptions will be only logged, if set to false,
    +	 * exceptions will be eventually thrown and cause the streaming program to
    +	 * fail (and enter recovery).
    +	 *
    +	 * <p>Method is only accessible for approach (a) (see above)
    +	 *
    +	 * @param logFailuresOnly The flag to indicate logging-only on exceptions.
    +	 */
    +	public void setLogFailuresOnly(boolean logFailuresOnly) {
    +		this.logFailuresOnly = logFailuresOnly;
    +	}
    +
    +	// ----------------------------------- Utilities --------------------------
    +
    +	/**
    +	 * Initializes the connection to Kafka.
    +	 *
    +	 * <p>This method is used for approach (a) (see above).
    +	 */
    +	@Override
    +	public void open(Configuration configuration) throws Exception {
    +		if (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
    +			LOG.warn(String.format("Using [%s] semantic, but checkpointing is not enabled. Switching to [%s] semantic.", semantic, Semantic.NONE));
    +			semantic = Semantic.NONE;
    +		}
    +
    +		if (logFailuresOnly) {
    +			callback = new Callback() {
    +				@Override
    +				public void onCompletion(RecordMetadata metadata, Exception e) {
    +					if (e != null) {
    +						LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
    +					}
    +					acknowledgeMessage();
    +				}
    +			};
    +		}
    +		else {
    +			callback = new Callback() {
    +				@Override
    +				public void onCompletion(RecordMetadata metadata, Exception exception) {
    +					if (exception != null && asyncException == null) {
    +						asyncException = exception;
    +					}
    +					acknowledgeMessage();
    +				}
    +			};
    +		}
    +
    +		super.open(configuration);
    +	}
    +
    +	@Override
    +	public void invoke(KafkaTransactionState transaction, IN next) throws Exception {
    +		invokeInternal(transaction, next, Long.MAX_VALUE);
    +	}
    +
    +	private void invokeInternal(KafkaTransactionState transaction, IN next, long elementTimestamp) throws Exception {
    +		checkErroneous();
    +
    +		byte[] serializedKey = schema.serializeKey(next);
    +		byte[] serializedValue = schema.serializeValue(next);
    +		String targetTopic = schema.getTargetTopic(next);
    +		if (targetTopic == null) {
    +			targetTopic = defaultTopicId;
    +		}
    +
    +		Long timestamp = null;
    +		if (this.writeTimestampToKafka) {
    +			timestamp = elementTimestamp;
    +		}
    +
    +		ProducerRecord<byte[], byte[]> record;
    +		int[] partitions = topicPartitionsMap.get(targetTopic);
    +		if (null == partitions) {
    +			partitions = getPartitionsByTopic(targetTopic, transaction.producer);
    +			topicPartitionsMap.put(targetTopic, partitions);
    +		}
    +		if (flinkKafkaPartitioner == null) {
    +			record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
    +		} else {
    +			record = new ProducerRecord<>(targetTopic, flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), timestamp, serializedKey, serializedValue);
    +		}
    +		pendingRecords.incrementAndGet();
    +		transaction.producer.send(record, callback);
    +	}
    +
    +	@Override
    +	public void close() throws Exception {
    +		if (currentTransaction != null) {
    +			// to avoid exceptions on aborting transactions with some pending records
    +			flush(currentTransaction);
    +		}
    +		try {
    +			super.close();
    +		}
    +		catch (Exception e) {
    +			asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
    +		}
    +		try {
    +			producersPool.close();
    +		}
    +		catch (Exception e) {
    +			asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
    +		}
    +		// make sure we propagate pending errors
    +		checkErroneous();
    +	}
    +
    +	// ------------------- Logic for handling checkpoint flushing -------------------------- //
    +
    +	@Override
    +	protected KafkaTransactionState beginTransaction() throws Exception {
    +		switch (semantic) {
    +			case EXACTLY_ONCE:
    +				FlinkKafkaProducer<byte[], byte[]> producer = producersPool.poll();
    +				if (producer == null) {
    +					String transactionalId = availableTransactionalIds.poll();
    +					if (transactionalId == null) {
    +						throw new Exception(
    +							"Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checktpoins.");
    +					}
    +					usedTransactionalIds.add(transactionalId);
    +					producer = initTransactionalProducer(transactionalId, true);
    +					producer.initTransactions();
    +				}
    +				producer.beginTransaction();
    +				return new KafkaTransactionState(producer.getTransactionalId(), producer);
    +			case AT_LEAST_ONCE:
    +			case NONE:
    +				// Do not create new producer on each beginTransaction() if it is not necessary
    +				if (currentTransaction != null && currentTransaction.producer != null) {
    +					return new KafkaTransactionState(currentTransaction.producer);
    +				}
    +				return new KafkaTransactionState(initProducer(true));
    +			default:
    +				throw new UnsupportedOperationException("Not implemented semantic");
    +		}
    +	}
    +
    +	@Override
    +	protected void preCommit(KafkaTransactionState transaction) throws Exception {
    +		switch (semantic) {
    +			case EXACTLY_ONCE:
    +			case AT_LEAST_ONCE:
    +				flush(transaction);
    +				break;
    +			case NONE:
    +				break;
    +			default:
    +				throw new UnsupportedOperationException("Not implemented semantic");
    +		}
    +		checkErroneous();
    +	}
    +
    +	@Override
    +	protected void commit(KafkaTransactionState transaction) {
    +		switch (semantic) {
    +			case EXACTLY_ONCE:
    +				transaction.producer.commitTransaction();
    +				producersPool.add(transaction.producer);
    +				break;
    +			case AT_LEAST_ONCE:
    +			case NONE:
    +				break;
    +			default:
    +				throw new UnsupportedOperationException("Not implemented semantic");
    +		}
    +	}
    +
    +	@Override
    +	protected void recoverAndCommit(KafkaTransactionState transaction) {
    +		switch (semantic) {
    +			case EXACTLY_ONCE:
    +				KafkaTransactionState kafkaTransaction = transaction;
    +				FlinkKafkaProducer<byte[], byte[]> producer =
    +					initTransactionalProducer(kafkaTransaction.transactionalId, false);
    +				producer.resumeTransaction(kafkaTransaction.producerId, kafkaTransaction.epoch);
    +				try {
    +					producer.commitTransaction();
    +					producer.close();
    +				}
    +				catch (InvalidTxnStateException ex) {
    +					// That means we have committed this transaction before.
    +					LOG.warn("Encountered error [%s] while recovering transaction [%s]. " +
    +						"Presumably this transaction has been already committed before",
    +						ex,
    +						transaction);
    +				}
    +				break;
    +			case AT_LEAST_ONCE:
    +			case NONE:
    +				break;
    +			default:
    +				throw new UnsupportedOperationException("Not implemented semantic");
    +		}
    +	}
    +
    +	@Override
    +	protected void abort(KafkaTransactionState transaction) {
    +		switch (semantic) {
    +			case EXACTLY_ONCE:
    +				transaction.producer.abortTransaction();
    +				producersPool.add(transaction.producer);
    +				break;
    +			case AT_LEAST_ONCE:
    +			case NONE:
    +				producersPool.add(transaction.producer);
    +				break;
    +			default:
    +				throw new UnsupportedOperationException("Not implemented semantic");
    +		}
    +	}
    +
    +	@Override
    +	protected void recoverAndAbort(KafkaTransactionState transaction) {
    +		switch (semantic) {
    +			case EXACTLY_ONCE:
    +				FlinkKafkaProducer<byte[], byte[]> producer =
    +					initTransactionalProducer(transaction.transactionalId, false);
    +				producer.resumeTransaction(transaction.producerId, transaction.epoch);
    +				producer.abortTransaction();
    +				producer.close();
    +				break;
    +			case AT_LEAST_ONCE:
    +			case NONE:
    +				break;
    +			default:
    +				throw new UnsupportedOperationException("Not implemented semantic");
    +		}
    +	}
    +
    +	private void acknowledgeMessage() {
    +		pendingRecords.decrementAndGet();
    +	}
    +
    +	/**
    +	 * Flush pending records.
    +	 * @param transaction
    +	 */
    +	private void flush(KafkaTransactionState transaction) throws Exception {
    +		if (transaction.producer != null) {
    +			transaction.producer.flush();
    +		}
    +		long pendingRecordsCount = pendingRecords.get();
    +		if (pendingRecordsCount != 0) {
    +			throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecordsCount);
    +		}
    +
    +		// if the flushed requests has errors, we should propagate it also and fail the checkpoint
    +		checkErroneous();
    +	}
    +
    +	@Override
    +	public void snapshotState(FunctionSnapshotContext context) throws Exception {
    +		super.snapshotState(context);
    +
    +		transactionalIdsState.clear();
    +		for (String transactionalId : availableTransactionalIds) {
    +			transactionalIdsState.add(transactionalId);
    +		}
    +		for (String transactionalId : usedTransactionalIds) {
    +			transactionalIdsState.add(transactionalId);
    +		}
    +	}
    +
    +	@Override
    +	public void initializeState(FunctionInitializationContext context) throws Exception {
    +		availableTransactionalIds.clear();
    +		for (int i = 0; i < kafkaProducersPoolSize; i++) {
    +			availableTransactionalIds.add(UUID.randomUUID().toString());
    --- End diff --
    
    Probably better to reuse stored ids rather than creating new ones each time. I am thinking of a case where  a task goes into crash loop dying even before first commit. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---