You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/09 16:59:12 UTC

[6/9] flink git commit: [FLINK-6988][kafka] Implement our own KafkaProducer class with transactions recovery

[FLINK-6988][kafka] Implement our own KafkaProducer class with transactions recovery


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d20728ba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d20728ba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d20728ba

Branch: refs/heads/master
Commit: d20728ba46977704827252ee5029bef9f949d5ab
Parents: 7a35c35
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed Jul 12 15:14:13 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 9 18:58:36 2017 +0200

----------------------------------------------------------------------
 .../kafka/internal/FlinkKafkaProducer.java      | 294 +++++++++++++++++++
 .../kafka/FlinkKafkaProducerTests.java          | 114 +++++++
 2 files changed, 408 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d20728ba/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
new file mode 100644
index 0000000..56b40d7
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Wrapper around KafkaProducer that allows to resume transactions in case of node failure, which allows to implement
+ * two phase commit algorithm for exactly-once semantic FlinkKafkaProducer.
+ *
+ * <p>For happy path usage is exactly the same as {@link org.apache.kafka.clients.producer.KafkaProducer}. User is
+ * expected to call:
+ *
+ * <ul>
+ *     <li>{@link FlinkKafkaProducer#initTransactions()}</li>
+ *     <li>{@link FlinkKafkaProducer#beginTransaction()}</li>
+ *     <li>{@link FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}</li>
+ *     <li>{@link FlinkKafkaProducer#flush()}</li>
+ *     <li>{@link FlinkKafkaProducer#commitTransaction()}</li>
+ * </ul>
+ *
+ * <p>To actually implement two phase commit, it must be possible to always commit a transaction after pre-committing
+ * it (here, pre-commit is just a {@link FlinkKafkaProducer#flush()}). In case of some failure between
+ * {@link FlinkKafkaProducer#flush()} and {@link FlinkKafkaProducer#commitTransaction()} this class allows to resume
+ * interrupted transaction and commit if after a restart:
+ *
+ * <ul>
+ *     <li>{@link FlinkKafkaProducer#initTransactions()}</li>
+ *     <li>{@link FlinkKafkaProducer#beginTransaction()}</li>
+ *     <li>{@link FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}</li>
+ *     <li>{@link FlinkKafkaProducer#flush()}</li>
+ *     <li>{@link FlinkKafkaProducer#getProducerId()}</li>
+ *     <li>{@link FlinkKafkaProducer#getEpoch()}</li>
+ *     <li>node failure... restore producerId and epoch from state</li>
+ *     <li>{@link FlinkKafkaProducer#resumeTransaction(long, short)}</li>
+ *     <li>{@link FlinkKafkaProducer#commitTransaction()}</li>
+ * </ul>
+ *
+ * <p>{@link FlinkKafkaProducer#resumeTransaction(long, short)} replaces {@link FlinkKafkaProducer#initTransactions()}
+ * as a way to obtain the producerId and epoch counters. It has to be done, because otherwise
+ * {@link FlinkKafkaProducer#initTransactions()} would automatically abort all on going transactions.
+ *
+ * <p>Second way this implementation differs from the reference {@link org.apache.kafka.clients.producer.KafkaProducer}
+ * is that this one actually flushes new partitions on {@link FlinkKafkaProducer#flush()} instead of on
+ * {@link FlinkKafkaProducer#commitTransaction()}.
+ *
+ * <p>The last one minor difference is that it allows to obtain the producerId and epoch counters via
+ * {@link FlinkKafkaProducer#getProducerId()} and {@link FlinkKafkaProducer#getEpoch()} methods (which are unfortunately
+ * private fields).
+ *
+ * <p>Those changes are compatible with Kafka's 0.11.0 REST API although it clearly was not the intention of the Kafka's
+ * API authors to make them possible.
+ *
+ * <p>Internally this implementation uses {@link org.apache.kafka.clients.producer.KafkaProducer} and implements
+ * required changes via Java Reflection API. It might not be the prettiest solution. An alternative would be to
+ * re-implement whole Kafka's 0.11 REST API client on our own.
+ */
+public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class);
+
+	private final KafkaProducer<K, V> kafkaProducer;
+
+	@Nullable
+	private final String transactionalId;
+
+	public FlinkKafkaProducer(Properties properties) {
+		transactionalId = properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+		kafkaProducer = new KafkaProducer<>(properties);
+	}
+
+	// -------------------------------- Simple proxy method calls --------------------------------
+
+	@Override
+	public void initTransactions() {
+		kafkaProducer.initTransactions();
+	}
+
+	@Override
+	public void beginTransaction() throws ProducerFencedException {
+		kafkaProducer.beginTransaction();
+	}
+
+	@Override
+	public void commitTransaction() throws ProducerFencedException {
+		kafkaProducer.commitTransaction();
+	}
+
+	@Override
+	public void abortTransaction() throws ProducerFencedException {
+		kafkaProducer.abortTransaction();
+	}
+
+	@Override
+	public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
+		kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
+	}
+
+	@Override
+	public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
+		return kafkaProducer.send(record);
+	}
+
+	@Override
+	public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
+		return kafkaProducer.send(record, callback);
+	}
+
+	@Override
+	public List<PartitionInfo> partitionsFor(String topic) {
+		return kafkaProducer.partitionsFor(topic);
+	}
+
+	@Override
+	public Map<MetricName, ? extends Metric> metrics() {
+		return kafkaProducer.metrics();
+	}
+
+	@Override
+	public void close() {
+		kafkaProducer.close();
+	}
+
+	@Override
+	public void close(long timeout, TimeUnit unit) {
+		kafkaProducer.close(timeout, unit);
+	}
+
+	// -------------------------------- New methods or methods with changed behaviour --------------------------------
+
+	@Override
+	public void flush() {
+		kafkaProducer.flush();
+		if (transactionalId != null) {
+			flushNewPartitions();
+		}
+	}
+
+	public void resumeTransaction(long producerId, short epoch) {
+		Preconditions.checkState(producerId >= 0 && epoch >= 0, "Incorrect values for producerId {} and epoch {}", producerId, epoch);
+		LOG.info("Attempting to resume transaction with producerId {} and epoch {}", producerId, epoch);
+
+		Object transactionManager = getValue(kafkaProducer, "transactionManager");
+		Object sequenceNumbers = getValue(transactionManager, "sequenceNumbers");
+
+		invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
+		invoke(sequenceNumbers, "clear");
+
+		Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+		setValue(producerIdAndEpoch, "producerId", producerId);
+		setValue(producerIdAndEpoch, "epoch", epoch);
+
+		invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
+
+		invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
+		setValue(transactionManager, "transactionStarted", true);
+	}
+
+	public String getTransactionalId() {
+		return transactionalId;
+	}
+
+	public long getProducerId() {
+		Object transactionManager = getValue(kafkaProducer, "transactionManager");
+		Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+		return (long) getValue(producerIdAndEpoch, "producerId");
+	}
+
+	public short getEpoch() {
+		Object transactionManager = getValue(kafkaProducer, "transactionManager");
+		Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+		return (short) getValue(producerIdAndEpoch, "epoch");
+	}
+
+	@VisibleForTesting
+	public int getTransactionCoordinatorId() {
+		Object transactionManager = getValue(kafkaProducer, "transactionManager");
+		Node node = (Node) invoke(transactionManager, "coordinator", FindCoordinatorRequest.CoordinatorType.TRANSACTION);
+		return node.id();
+	}
+
+	private void flushNewPartitions() {
+		LOG.info("Flushing new partitions");
+		Object transactionManager = getValue(kafkaProducer, "transactionManager");
+		Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
+		invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
+		TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
+		Object sender = getValue(kafkaProducer, "sender");
+		invoke(sender, "wakeup");
+		result.await();
+	}
+
+	private static Enum<?> getEnum(String enumFullName) {
+		String[] x = enumFullName.split("\\.(?=[^\\.]+$)");
+		if (x.length == 2) {
+			String enumClassName = x[0];
+			String enumName = x[1];
+			try {
+				Class<Enum> cl = (Class<Enum>) Class.forName(enumClassName);
+				return Enum.valueOf(cl, enumName);
+			} catch (ClassNotFoundException e) {
+				throw new RuntimeException("Incompatible KafkaProducer version", e);
+			}
+		}
+		return null;
+	}
+
+	private static Object invoke(Object object, String methodName, Object... args) {
+		Class<?>[] argTypes = new Class[args.length];
+		for (int i = 0; i < args.length; i++) {
+			argTypes[i] = args[i].getClass();
+		}
+		return invoke(object, methodName, argTypes, args);
+	}
+
+	private static Object invoke(Object object, String methodName, Class<?>[] argTypes, Object[] args) {
+		try {
+			Method method = object.getClass().getDeclaredMethod(methodName, argTypes);
+			method.setAccessible(true);
+			return method.invoke(object, args);
+		} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
+			throw new RuntimeException("Incompatible KafkaProducer version", e);
+		}
+	}
+
+	private static Object getValue(Object object, String fieldName) {
+		return getValue(object, object.getClass(), fieldName);
+	}
+
+	private static Object getValue(Object object, Class<?> clazz, String fieldName) {
+		try {
+			Field field = clazz.getDeclaredField(fieldName);
+			field.setAccessible(true);
+			return field.get(object);
+		} catch (NoSuchFieldException | IllegalAccessException e) {
+			throw new RuntimeException("Incompatible KafkaProducer version", e);
+		}
+	}
+
+	private static void setValue(Object object, String fieldName, Object value) {
+		try {
+			Field field = object.getClass().getDeclaredField(fieldName);
+			field.setAccessible(true);
+			field.set(object, value);
+		} catch (NoSuchFieldException | IllegalAccessException e) {
+			throw new RuntimeException("Incompatible KafkaProducer version", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d20728ba/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java
new file mode 100644
index 0000000..18bbd8f
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for our own {@link FlinkKafkaProducer}.
+ */
+@SuppressWarnings("serial")
+public class FlinkKafkaProducerTests extends KafkaTestBase {
+	protected String transactionalId;
+	protected Properties extraProperties;
+
+	@Before
+	public void before() {
+		transactionalId = UUID.randomUUID().toString();
+		extraProperties = new Properties();
+		extraProperties.putAll(standardProps);
+		extraProperties.put("transactional.id", transactionalId);
+		extraProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		extraProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		extraProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+		extraProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+		extraProperties.put("isolation.level", "read_committed");
+	}
+
+	@Test(timeout = 30000L)
+	public void testHappyPath() throws IOException {
+		String topicName = "flink-kafka-producer-happy-path";
+		try (Producer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
+			kafkaProducer.initTransactions();
+			kafkaProducer.beginTransaction();
+			kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+			kafkaProducer.commitTransaction();
+		}
+		assertRecord(topicName, "42", "42");
+		deleteTestTopic(topicName);
+	}
+
+	@Test(timeout = 30000L)
+	public void testResumeTransaction() throws IOException {
+		String topicName = "flink-kafka-producer-resume-transaction";
+		try (FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
+			kafkaProducer.initTransactions();
+			kafkaProducer.beginTransaction();
+			kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+			kafkaProducer.flush();
+			long producerId = kafkaProducer.getProducerId();
+			short epoch = kafkaProducer.getEpoch();
+
+			try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) {
+				resumeProducer.resumeTransaction(producerId, epoch);
+				resumeProducer.commitTransaction();
+			}
+
+			assertRecord(topicName, "42", "42");
+
+			// this shouldn't throw - in case of network split, old producer might attempt to commit it's transaction
+			kafkaProducer.commitTransaction();
+
+			// this shouldn't fail also, for same reason as above
+			try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) {
+				resumeProducer.resumeTransaction(producerId, epoch);
+				resumeProducer.commitTransaction();
+			}
+		}
+		deleteTestTopic(topicName);
+	}
+
+	private void assertRecord(String topicName, String expectedKey, String expectedValue) {
+		try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) {
+			kafkaConsumer.subscribe(Collections.singletonList(topicName));
+			ConsumerRecords<String, String> records = kafkaConsumer.poll(10000);
+
+			ConsumerRecord<String, String> record = Iterables.getOnlyElement(records);
+			assertEquals(expectedKey, record.key());
+			assertEquals(expectedValue, record.value());
+		}
+	}
+}