You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/17 23:21:55 UTC
kafka git commit: KAFKA-5188; Integration tests for transactions
Repository: kafka
Updated Branches:
refs/heads/trunk 2181ae768 -> b3a33ce4b
KAFKA-5188; Integration tests for transactions
Author: Apurva Mehta <ap...@confluent.io>
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #2994 from apurvam/KAFKA-5188-exactly-once-integration-tests
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b3a33ce4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b3a33ce4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b3a33ce4
Branch: refs/heads/trunk
Commit: b3a33ce4b81a20ae5635cf28490fd2e1f9d86141
Parents: 2181ae7
Author: Apurva Mehta <ap...@confluent.io>
Authored: Wed May 17 16:20:33 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed May 17 16:20:33 2017 -0700
----------------------------------------------------------------------
.../clients/producer/internals/Sender.java | 59 ++-
.../producer/internals/TransactionManager.java | 25 +-
.../kafka/common/requests/EndTxnRequest.java | 4 +
.../internals/TransactionManagerTest.java | 47 ++-
.../transaction/TransactionCoordinator.scala | 24 +-
.../transaction/TransactionMetadata.scala | 2 +-
.../transaction/TransactionStateManager.scala | 1 -
.../kafka/api/TransactionsBounceTest.scala | 185 +++++++++
.../kafka/api/TransactionsTest.scala | 400 +++++++++++++++++++
.../test/scala/unit/kafka/utils/TestUtils.scala | 95 ++++-
10 files changed, 806 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 209a979..7180171 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
@@ -39,7 +40,6 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.InitProducerIdRequest;
@@ -187,10 +187,12 @@ public class Sender implements Runnable {
* @param now The current POSIX time in milliseconds
*/
void run(long now) {
- long pollTimeout = 0;
- if (!maybeSendTransactionalRequest(now))
+ long pollTimeout = retryBackoffMs;
+ if (!maybeSendTransactionalRequest(now)) {
pollTimeout = sendProducerData(now);
+ }
+ log.trace("waiting {}ms in poll", pollTimeout);
this.client.poll(pollTimeout, now);
}
@@ -203,6 +205,7 @@ public class Sender implements Runnable {
final KafkaException exception = transactionManager.lastError() instanceof KafkaException
? (KafkaException) transactionManager.lastError()
: new KafkaException(transactionManager.lastError());
+ log.error("aborting producer batches because the transaction manager is in an error state.", exception);
this.accumulator.abortBatches(exception);
return Long.MAX_VALUE;
}
@@ -281,25 +284,35 @@ public class Sender implements Runnable {
}
private boolean maybeSendTransactionalRequest(long now) {
- if (transactionManager != null && transactionManager.hasInflightRequest())
+ if (transactionManager == null || !transactionManager.isTransactional())
+ return false;
+
+ if (transactionManager.hasInflightRequest()) {
+ log.trace("TransactionalId: {} -- There is already an inflight transactional request. Going to wait for the response.",
+ transactionManager.transactionalId());
return true;
+ }
- if (transactionManager == null || !transactionManager.hasPendingTransactionalRequests())
+ if (!transactionManager.hasPendingTransactionalRequests()) {
+ log.trace("TransactionalId: {} -- There are no pending transactional requests to send", transactionManager.transactionalId());
return false;
+ }
TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequestHandler();
- if (nextRequestHandler.isEndTxn()) {
- if (transactionManager.isCompletingTransaction() && accumulator.hasUnflushedBatches()) {
- if (!accumulator.flushInProgress())
- accumulator.beginFlush();
- transactionManager.reenqueue(nextRequestHandler);
- return false;
- } else if (transactionManager.isInErrorState()) {
- nextRequestHandler.fatal(new KafkaException("Cannot commit transaction when there are " +
- "request errors. Please check your logs for the details of the errors encountered."));
- return false;
- }
+ if (nextRequestHandler.isEndTxn() && transactionManager.isCompletingTransaction() && accumulator.hasUnflushedBatches()) {
+ if (!accumulator.flushInProgress())
+ accumulator.beginFlush();
+ transactionManager.reenqueue(nextRequestHandler);
+ log.trace("TransactionalId: {} -- Going to wait for pending ProducerBatches to flush before sending an " +
+ "end transaction request", transactionManager.transactionalId());
+ return false;
+ }
+
+ if (transactionManager.maybeTerminateRequestWithError(nextRequestHandler)) {
+ log.trace("TransactionalId: {} -- Not sending a transactional request because we are in an error state",
+ transactionManager.transactionalId());
+ return false;
}
Node targetNode = null;
@@ -314,7 +327,6 @@ public class Sender implements Runnable {
}
if (!NetworkClientUtils.awaitReady(client, targetNode, time, requestTimeout)) {
transactionManager.lookupCoordinator(nextRequestHandler);
- targetNode = null;
break;
}
} else {
@@ -322,23 +334,30 @@ public class Sender implements Runnable {
}
if (targetNode != null) {
if (nextRequestHandler.isRetry()) {
+ log.trace("TransactionalId: {} -- Waiting {}ms before resending a transactional request {}",
+ transactionManager.transactionalId(), retryBackoffMs, nextRequestHandler.requestBuilder());
time.sleep(retryBackoffMs);
}
ClientRequest clientRequest = client.newClientRequest(targetNode.idString(), nextRequestHandler.requestBuilder(),
now, true, nextRequestHandler);
transactionManager.setInFlightRequestCorrelationId(clientRequest.correlationId());
+ log.trace("TransactionalId: {} -- Sending transactional request {} to node {}", transactionManager.transactionalId(),
+ nextRequestHandler.requestBuilder(), clientRequest.destination());
client.send(clientRequest, now);
return true;
}
} catch (IOException e) {
- log.warn("Got an exception when trying to find a node to send a transactional request to. Going to back off and retry", e);
+ targetNode = null;
+ log.warn("TransactionalId: " + transactionManager.transactionalId() + " -- Got an exception when trying " +
+ "to find a node to send transactional request " + nextRequestHandler.requestBuilder() + ". Going to back off and retry", e);
}
+ log.trace("TransactionalId: {}. About to wait for {}ms before trying to send another transactional request.",
+ transactionManager.transactionalId(), retryBackoffMs);
time.sleep(retryBackoffMs);
metadata.requestUpdate();
}
- if (targetNode == null)
- transactionManager.retry(nextRequestHandler);
+ transactionManager.retry(nextRequestHandler);
return true;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 551a75a..55c1782 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -95,7 +95,8 @@ public class TransactionManager {
case INITIALIZING:
return source == UNINITIALIZED || source == ERROR;
case READY:
- return source == INITIALIZING || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION;
+ return source == INITIALIZING || source == COMMITTING_TRANSACTION
+ || source == ABORTING_TRANSACTION || source == ERROR;
case IN_TRANSACTION:
return source == READY;
case COMMITTING_TRANSACTION:
@@ -246,7 +247,7 @@ public class TransactionManager {
}
public boolean isInErrorState() {
- return currentState == State.ERROR;
+ return currentState == State.ERROR || currentState == State.FENCED;
}
public synchronized void setError(Exception exception) {
@@ -256,6 +257,23 @@ public class TransactionManager {
transitionTo(State.ERROR, exception);
}
+ boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler) {
+ if (isInErrorState() && requestHandler.isEndTxn()) {
+ // We shouldn't terminate abort requests from error states.
+ EndTxnHandler endTxnHandler = (EndTxnHandler) requestHandler;
+ if (endTxnHandler.builder.result() == TransactionResult.ABORT)
+ return false;
+ String errorMessage = "Cannot commit transaction because at least one previous transactional request " +
+ "was not completed successfully.";
+ if (lastError != null)
+ requestHandler.fatal(new KafkaException(errorMessage, lastError));
+ else
+ requestHandler.fatal(new KafkaException(errorMessage));
+ return true;
+ }
+ return false;
+ }
+
/**
* Get the current producer id and epoch without blocking. Callers must use {@link ProducerIdAndEpoch#isValid()} to
* verify that the result is valid.
@@ -484,6 +502,7 @@ public class TransactionManager {
"transactional.id has been started: producerId: {}. epoch: {}.",
producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
result.setError(Errors.INVALID_PRODUCER_EPOCH.exception());
+ lastError = Errors.INVALID_PRODUCER_EPOCH.exception();
transitionTo(State.FENCED, Errors.INVALID_PRODUCER_EPOCH.exception());
result.done();
}
@@ -501,10 +520,12 @@ public class TransactionManager {
} else {
clearInFlightRequestCorrelationId();
if (response.wasDisconnected()) {
+ log.trace("disconnected from " + response.destination() + ". Will retry.");
reenqueue();
} else if (response.versionMismatch() != null) {
fatal(response.versionMismatch());
} else if (response.hasResponse()) {
+ log.trace("Got transactional response for request:" + requestBuilder());
handleResponse(response.responseBody());
} else {
fatal(new KafkaException("Could not execute transactional request for unknown reasons"));
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
index ff9b82c..77ec137 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
@@ -42,6 +42,10 @@ public class EndTxnRequest extends AbstractRequest {
this.result = result;
}
+ public TransactionResult result() {
+ return result;
+ }
+
@Override
public EndTxnRequest build(short version) {
return new EndTxnRequest(version, transactionalId, producerId, producerEpoch, result);
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 4db0452..6a35061 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -433,7 +433,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, new MockCallback(transactionManager), MAX_BLOCK_TIMEOUT).future;
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@@ -442,6 +442,8 @@ public class TransactionManagerTest {
sender.run(time.milliseconds()); // send produce.
+ assertTrue(responseFuture.isDone());
+ assertTrue(transactionManager.isInErrorState());
responseFuture.get();
}
@@ -500,6 +502,48 @@ public class TransactionManagerTest {
}
@Test
+ public void testAllowAbortOnProduceFailure() throws InterruptedException {
+ client.setNode(brokerNode);
+ // This is called from the initTransactions method in the producer as the first order of business.
+ // It finds the coordinator and then gets a PID.
+ final long pid = 13131L;
+ final short epoch = 1;
+ transactionManager.initializeTransactions();
+ prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
+
+ sender.run(time.milliseconds()); // find coordinator
+ sender.run(time.milliseconds());
+
+ prepareInitPidResponse(Errors.NONE, false, pid, epoch);
+ sender.run(time.milliseconds()); // get pid.
+
+ assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
+ assertTrue(transactionManager.hasProducerId());
+
+ transactionManager.beginTransaction();
+ transactionManager.maybeAddPartitionToTransaction(tp0);
+
+ Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+ "value".getBytes(), Record.EMPTY_HEADERS, new MockCallback(transactionManager), MAX_BLOCK_TIMEOUT).future;
+
+ TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+ assertFalse(responseFuture.isDone());
+ prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
+ prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, pid, epoch);
+ prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
+
+ sender.run(time.milliseconds()); // Send AddPartitionsRequest
+ assertFalse(abortResult.isCompleted());
+
+ sender.run(time.milliseconds()); // Send Produce Request, returns OutOfOrderSequenceException.
+ sender.run(time.milliseconds()); // try to abort
+ assertTrue(abortResult.isCompleted());
+ assertTrue(abortResult.isSuccessful());
+ assertTrue(transactionManager.isReadyForTransaction()); // make sure we are ready for a transaction now.
+ }
+
+
+ @Test
public void shouldNotAddPartitionsToTransactionWhenTopicAuthorizationFailed() throws Exception {
verifyAddPartitionsFailsWithPartitionLevelError(Errors.TOPIC_AUTHORIZATION_FAILED);
}
@@ -519,6 +563,7 @@ public class TransactionManagerTest {
final long pid = 1L;
final short epoch = 1;
+
prepareInitPidResponse(Errors.NONE, false, pid, epoch);
sender.run(time.milliseconds()); // get pid.
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 233f7d7..8148cb6 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -271,6 +271,14 @@ class TransactionCoordinator(brokerId: Int,
txnMarkerChannelManager.removeMarkersForTxnTopicPartition(txnTopicPartitionId)
}
+ private def logInvalidStateTransitionAndReturnError(transactionalId: String,
+ transactionState: TransactionState,
+ transactionResult: TransactionResult) = {
+ error(s"transactionalId: $transactionalId -- Current state is $transactionState, but received transaction " +
+ s"marker result: $transactionResult")
+ Left(Errors.INVALID_TXN_STATE)
+ }
+
def handleEndTransaction(transactionalId: String,
producerId: Long,
producerEpoch: Short,
@@ -306,24 +314,24 @@ class TransactionCoordinator(brokerId: Int,
if (txnMarkerResult == TransactionResult.COMMIT)
Left(Errors.NONE)
else
- Left(Errors.INVALID_TXN_STATE)
+ logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
case CompleteAbort =>
if (txnMarkerResult == TransactionResult.ABORT)
Left(Errors.NONE)
else
- Left(Errors.INVALID_TXN_STATE)
+ logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
case PrepareCommit =>
if (txnMarkerResult == TransactionResult.COMMIT)
Left(Errors.CONCURRENT_TRANSACTIONS)
else
- Left(Errors.INVALID_TXN_STATE)
+ logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
case PrepareAbort =>
if (txnMarkerResult == TransactionResult.ABORT)
Left(Errors.CONCURRENT_TRANSACTIONS)
else
- Left(Errors.INVALID_TXN_STATE)
+ logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
case Empty =>
- Left(Errors.INVALID_TXN_STATE)
+ logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
}
}
}
@@ -349,15 +357,15 @@ class TransactionCoordinator(brokerId: Int,
Left(Errors.CONCURRENT_TRANSACTIONS)
else txnMetadata.state match {
case Empty| Ongoing | CompleteCommit | CompleteAbort =>
- Left(Errors.INVALID_TXN_STATE)
+ logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
case PrepareCommit =>
if (txnMarkerResult != TransactionResult.COMMIT)
- Left(Errors.INVALID_TXN_STATE)
+ logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
else
Right(txnMetadata, txnMetadata.prepareComplete(time.milliseconds()))
case PrepareAbort =>
if (txnMarkerResult != TransactionResult.ABORT)
- Left(Errors.INVALID_TXN_STATE)
+ logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
else
Right(txnMetadata, txnMetadata.prepareComplete(time.milliseconds()))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index d05676b..0d176aa 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -135,7 +135,7 @@ private[transaction] class TransactionMetadata(val producerId: Long,
}
def removePartition(topicPartition: TopicPartition): Unit = {
- if (pendingState.isDefined || (state != PrepareCommit && state != PrepareAbort))
+ if (pendingState.isDefined && (state != PrepareCommit && state != PrepareAbort))
throw new IllegalStateException(s"Transation metadata's current state is $state, and its pending state is $state " +
s"while trying to remove partitions whose txn marker has been sent, this is not expected")
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index cf41fc3..2327213 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -155,7 +155,6 @@ class TransactionStateManager(brokerId: Int,
props.put(LogConfig.UncleanLeaderElectionEnableProp, "false")
props.put(LogConfig.CompressionTypeProp, UncompressedCodec.name)
props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
-
props.put(LogConfig.MinInSyncReplicasProp, config.transactionLogMinInsyncReplicas.toString)
props.put(LogConfig.SegmentBytesProp, config.transactionLogSegmentBytes.toString)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
new file mode 100644
index 0000000..f1fd365
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.util.Properties
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{ShutdownableThread, TestUtils}
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.junit.{Ignore, Test}
+
+import scala.collection.JavaConversions._
+import org.junit.Assert._
+
+
+class TransactionsBounceTest extends KafkaServerTestHarness {
+ private val producerBufferSize = 65536
+ private val serverMessageMaxBytes = producerBufferSize/2
+ private val numPartitions = 3
+
+ val numServers = 4
+ private val outputTopic = "output-topic"
+ private val inputTopic = "input-topic"
+
+ val overridingProps = new Properties()
+ overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
+ overridingProps.put(KafkaConfig.MessageMaxBytesProp, serverMessageMaxBytes.toString)
+ // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic)
+ // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
+ overridingProps.put(KafkaConfig.ControlledShutdownEnableProp, true.toString)
+ overridingProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString)
+ overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+ overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
+ overridingProps.put(KafkaConfig.OffsetsTopicReplicationFactorProp, 3.toString)
+ overridingProps.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
+ overridingProps.put(KafkaConfig.TransactionsTopicPartitionsProp, 1.toString)
+ overridingProps.put(KafkaConfig.TransactionsTopicReplicationFactorProp, 3.toString)
+ overridingProps.put(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session timeout
+ overridingProps.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
+
+
+ // This is the one of the few tests we currently allow to preallocate ports, despite the fact that this can result in transient
+ // failures due to ports getting reused. We can't use random ports because of bad behavior that can result from bouncing
+ // brokers too quickly when they get new, random ports. If we're not careful, the client can end up in a situation
+ // where metadata is not refreshed quickly enough, and by the time it's actually trying to, all the servers have
+ // been bounced and have new addresses. None of the bootstrap nodes or current metadata can get them connected to a
+ // running server.
+ //
+ // Since such quick rotation of servers is incredibly unrealistic, we allow this one test to preallocate ports, leaving
+ // a small risk of hitting errors due to port conflicts. Hopefully this is infrequent enough to not cause problems.
+ override def generateConfigs() = {
+ FixedPortTestUtils.createBrokerConfigs(numServers, zkConnect,enableControlledShutdown = true)
+ .map(KafkaConfig.fromProps(_, overridingProps))
+ }
+
+ @Ignore // need to fix KAFKA-5268 and KAFKA-5269 before re-enabling
+ @Test
+ def testBrokerFailure() {
+ // basic idea is to seed a topic with 10000 records, and copy it transactionally while bouncing brokers
+ // constantly through the period.
+ val consumerGroup= "myGroup"
+ val numInputRecords = 5000
+ createTopics()
+
+ TestUtils.seedTopicWithNumberedRecords(inputTopic, numInputRecords, servers)
+
+ var consumer = createConsumerAndSubscribeToTopics(consumerGroup, List(inputTopic))
+ val producer = TestUtils.createTransactionalProducer("my-test-producer-t.id", servers)
+
+ val scheduler = new BounceScheduler
+ producer.initTransactions()
+ scheduler.start()
+
+ var numMessagesProcessed = 0
+ var iteration = 0
+ try {
+ while (numMessagesProcessed < numInputRecords) {
+ val toRead = Math.min(200, numInputRecords - numMessagesProcessed)
+ trace(s"$iteration: About to read $toRead messages, processed $numMessagesProcessed so far..")
+ val records = TestUtils.pollUntilAtLeastNumRecords(consumer, toRead)
+ trace(s"received ${records.size} messages. sending them transactionally to $outputTopic")
+ producer.beginTransaction()
+ val shouldAbort = iteration % 10 == 0
+ records.zipWithIndex.foreach { case (record, i) =>
+ producer.send(
+ TestUtils.producerRecordWithExpectedTransactionStatus(outputTopic, record.key, record.value, !shouldAbort),
+ new ErrorLoggingCallback(outputTopic, record.key, record.value, true))
+ }
+ trace(s"Sent ${records.size} messages. Committing offsets.")
+ producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer), consumerGroup)
+ if (shouldAbort) {
+ trace(s"Committed offsets. Aborting transaction of ${records.size} messages.")
+ producer.abortTransaction()
+ consumer.close()
+ consumer = createConsumerAndSubscribeToTopics(consumerGroup, List(inputTopic))
+ } else {
+ trace(s"Committed offsets. committing transaction of ${records.size} messages.")
+ producer.commitTransaction()
+ numMessagesProcessed += records.size
+ }
+ iteration += 1
+ }
+ } finally {
+ producer.close()
+ consumer.close()
+ }
+
+ scheduler.shutdown()
+
+ val verifyingConsumer = createConsumerAndSubscribeToTopics("randoGroup", List(outputTopic), readCommitted = true)
+ val outputRecords = TestUtils.pollUntilAtLeastNumRecords(verifyingConsumer, numInputRecords).map { case(record) =>
+ TestUtils.assertCommittedAndGetValue(record).toInt
+ }
+ val recordSet = outputRecords.toSet
+ assertEquals(numInputRecords, recordSet.size)
+
+ val expectedValues = (0 until numInputRecords).toSet
+ assertEquals(s"Missing messages: ${expectedValues -- recordSet}", expectedValues, recordSet)
+
+ verifyingConsumer.close()
+ }
+
+ private def createConsumerAndSubscribeToTopics(groupId: String, topics: List[String], readCommitted: Boolean = false) = {
+ val props = new Properties()
+ if (readCommitted)
+ props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
+ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "200")
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+ props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000")
+ props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000")
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+
+ val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = groupId,
+ securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props))
+ consumer.subscribe(topics)
+ consumer
+ }
+
+ private def createTopics() = {
+ val topicConfig = new Properties()
+ topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
+ TestUtils.createTopic(zkUtils, inputTopic, numPartitions, numServers, servers, topicConfig)
+ TestUtils.createTopic(zkUtils, outputTopic, numPartitions, numServers, servers, topicConfig)
+ }
+
+ private class BounceScheduler extends ShutdownableThread("daemon-broker-bouncer", false) {
+ override def doWork(): Unit = {
+ for (server <- servers) {
+ trace("Shutting down server : %s".format(server.config.brokerId))
+ server.shutdown()
+ server.awaitShutdown()
+ Thread.sleep(500)
+ trace("Server %s shut down. Starting it up again.".format(server.config.brokerId))
+ server.startup()
+ trace("Restarted server: %s".format(server.config.brokerId))
+ Thread.sleep(500)
+ }
+
+ (0 until numPartitions).foreach(partition => TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, outputTopic, partition))
+ }
+
+ override def shutdown(){
+ super.shutdown()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
new file mode 100644
index 0000000..3e19bb9
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -0,0 +1,400 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.util.Properties
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.ProducerFencedException
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.junit.{After, Before, Ignore, Test}
+import org.junit.Assert._
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.ExecutionException
+import scala.util.Random
+
+class TransactionsTest extends KafkaServerTestHarness {
+ val numServers = 3
+ val topic1 = "topic1"
+ val topic2 = "topic2"
+
+
+ override def generateConfigs : Seq[KafkaConfig] = {
+ TestUtils.createBrokerConfigs(numServers, zkConnect, true).map(KafkaConfig.fromProps(_, serverProps()))
+ }
+
+ @Before
+ override def setUp(): Unit = {
+ super.setUp()
+ val numPartitions = 3
+ val topicConfig = new Properties()
+ topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
+ TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers, topicConfig)
+ TestUtils.createTopic(zkUtils, topic2, numPartitions, numServers, servers, topicConfig)
+ }
+
+ @After
+ override def tearDown(): Unit = {
+ super.tearDown()
+ }
+
+ @Test
+ def testBasicTransactions() = {
+ val producer = TestUtils.createTransactionalProducer("my-hello-world-transactional-id", servers)
+ val consumer = transactionalConsumer("transactional-group")
+ val unCommittedConsumer = nonTransactionalConsumer("non-transactional-group")
+ try {
+ producer.initTransactions()
+
+ producer.beginTransaction()
+ producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "2", willBeCommitted = false))
+ producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "4", "4", willBeCommitted = false))
+ producer.abortTransaction()
+
+ producer.beginTransaction()
+ producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = true))
+ producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = true))
+ producer.commitTransaction()
+
+ consumer.subscribe(List(topic1, topic2))
+ unCommittedConsumer.subscribe(List(topic1, topic2))
+
+ val records = pollUntilExactlyNumRecords(consumer, 2)
+ records.zipWithIndex.foreach { case (record, i) =>
+ TestUtils.assertCommittedAndGetValue(record)
+ }
+
+ val allRecords = pollUntilExactlyNumRecords(unCommittedConsumer, 4)
+ val expectedValues = List("1", "2", "3", "4").toSet
+ allRecords.zipWithIndex.foreach { case (record, i) =>
+ assertTrue(expectedValues.contains(TestUtils.recordValueAsString(record)))
+ }
+ } finally {
+ consumer.close()
+ producer.close()
+ unCommittedConsumer.close()
+ }
+ }
+
+ @Test
+ def testSendOffsets() = {
+ // The basic plan for the test is as follows:
+ // 1. Seed topic1 with 1000 unique, numbered, messages.
+ // 2. Run a consume/process/produce loop to transactionally copy messages from topic1 to topic2 and commit
+ // offsets as part of the transaction.
+ // 3. Randomly abort transactions in step2.
+ // 4. Validate that we have 1000 unique committed messages in topic2. If the offsets were committed properly with the
+ // transactions, we should not have any duplicates or missing messages since we should process in the input
+ // messages exactly once.
+
+ val transactionalId = "foobar-id"
+ val consumerGroupId = "foobar-consumer-group"
+ val numSeedMessages = 500
+
+ TestUtils.seedTopicWithNumberedRecords(topic1, numSeedMessages, servers)
+
+ val producer = TestUtils.createTransactionalProducer(transactionalId, servers)
+
+ var consumer = transactionalConsumer(consumerGroupId, maxPollRecords = numSeedMessages / 4)
+ consumer.subscribe(List(topic1))
+ producer.initTransactions()
+
+ val random = new Random()
+ var shouldCommit = false
+ var recordsProcessed = 0
+ try {
+ while (recordsProcessed < numSeedMessages) {
+ producer.beginTransaction()
+ shouldCommit = !shouldCommit
+
+ val records = TestUtils.pollUntilAtLeastNumRecords(consumer, Math.min(10, numSeedMessages - recordsProcessed))
+ records.zipWithIndex.foreach { case (record, i) =>
+ val key = new String(record.key(), "UTF-8")
+ val value = new String(record.value(), "UTF-8")
+ producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, key, value, willBeCommitted = shouldCommit))
+ }
+
+ producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer), consumerGroupId)
+ if (shouldCommit) {
+ producer.commitTransaction()
+ recordsProcessed += records.size
+ debug(s"committed transaction.. Last committed record: ${new String(records.last.value(), "UTF-8")}. Num " +
+ s"records written to $topic2: $recordsProcessed")
+ } else {
+ producer.abortTransaction()
+ debug(s"aborted transaction Last committed record: ${new String(records.last.value(), "UTF-8")}. Num " +
+ s"records written to $topic2: $recordsProcessed")
+ consumer.close()
+ consumer = transactionalConsumer(consumerGroupId, maxPollRecords = numSeedMessages / 4)
+ consumer.subscribe(List(topic1))
+ }
+ }
+ } finally {
+ producer.close()
+ consumer.close()
+ }
+
+ // Inspite of random aborts, we should still have exactly 1000 messages in topic2. Ie. we should not
+ // re-copy or miss any messages from topic1, since the consumed offsets were committed transactionally.
+ val verifyingConsumer = transactionalConsumer("foobargroup")
+ verifyingConsumer.subscribe(List(topic2))
+ val valueSeq = TestUtils.pollUntilAtLeastNumRecords(verifyingConsumer, numSeedMessages).map { record =>
+ TestUtils.assertCommittedAndGetValue(record).toInt
+ }
+ verifyingConsumer.close()
+ val valueSet = valueSeq.toSet
+ assertEquals(s"Expected $numSeedMessages values in $topic2.", numSeedMessages, valueSeq.size)
+ assertEquals(s"Expected ${valueSeq.size} unique messages in $topic2.", valueSeq.size, valueSet.size)
+ }
+
+ @Test
+ def testFencingOnCommit() = {
+ val transactionalId = "my-t.id"
+ val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers)
+ val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers)
+ val consumer = transactionalConsumer()
+ consumer.subscribe(List(topic1, topic2))
+
+ try {
+ producer1.initTransactions()
+
+ producer1.beginTransaction()
+ producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
+ producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
+
+ producer2.initTransactions() // ok, will abort the open transaction.
+ producer2.beginTransaction()
+ producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true))
+ producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true))
+
+ try {
+ producer1.commitTransaction()
+ fail("Should not be able to commit transactions from a fenced producer.")
+ } catch {
+ case e : ProducerFencedException =>
+ // good!
+ producer1.close()
+ case e : Exception =>
+ fail("Got an unexpected exception from a fenced producer.", e)
+ }
+
+ producer2.commitTransaction() // ok
+
+ val records = pollUntilExactlyNumRecords(consumer, 2)
+ records.zipWithIndex.foreach { case (record, i) =>
+ TestUtils.assertCommittedAndGetValue(record)
+ }
+ } finally {
+ consumer.close()
+ producer2.close()
+ }
+ }
+
+ @Test
+ def testFencingOnSendOffsets() = {
+ val transactionalId = "my-t.id"
+ val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers)
+ val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers)
+ val consumer = transactionalConsumer()
+ consumer.subscribe(List(topic1, topic2))
+
+ try {
+ producer1.initTransactions()
+
+ producer1.beginTransaction()
+ producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
+ producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
+
+ producer2.initTransactions() // ok, will abort the open transaction.
+ producer2.beginTransaction()
+ producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true))
+ producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true))
+
+ try {
+ producer1.sendOffsetsToTransaction(Map(new TopicPartition("foobartopic", 0) -> new OffsetAndMetadata(110L)), "foobarGroup")
+ fail("Should not be able to send offsets from a fenced producer.")
+ } catch {
+ case e : ProducerFencedException =>
+ // good!
+ producer1.close()
+ case e : Exception =>
+ fail("Got an unexpected exception from a fenced producer.", e)
+ }
+
+ producer2.commitTransaction() // ok
+
+ val records = pollUntilExactlyNumRecords(consumer, 2)
+ records.zipWithIndex.foreach { case (record, i) =>
+ TestUtils.assertCommittedAndGetValue(record)
+ }
+ } finally {
+ consumer.close()
+ producer2.close()
+ }
+ }
+
+ @Ignore @Test
+ def testFencingOnSend() {
+ val transactionalId = "my-t.id"
+ val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers)
+ val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers)
+ val consumer = transactionalConsumer()
+ consumer.subscribe(List(topic1, topic2))
+
+ try {
+ producer1.initTransactions()
+
+ producer1.beginTransaction()
+ producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
+ producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
+
+ producer2.initTransactions() // ok, will abort the open transaction.
+ producer2.beginTransaction()
+ producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true)).get()
+ producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true)).get()
+
+ try {
+ val result = producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "5", willBeCommitted = false))
+ val recordMetadata = result.get()
+ error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic()}-${recordMetadata.partition()}. Grab the logs!!")
+ servers.foreach { case (server) =>
+ error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}")
+ }
+ fail("Should not be able to send messages from a fenced producer.")
+ } catch {
+ case e : ProducerFencedException =>
+ producer1.close()
+ case e : ExecutionException =>
+ assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
+ producer1.close()
+ case e : Exception =>
+ fail("Got an unexpected exception from a fenced producer.", e)
+ }
+
+ producer2.commitTransaction() // ok
+
+ val records = pollUntilExactlyNumRecords(consumer, 2)
+ records.zipWithIndex.foreach { case (record, i) =>
+ TestUtils.assertCommittedAndGetValue(record)
+ }
+ } finally {
+ consumer.close()
+ producer2.close()
+ }
+ }
+
+ @Test
+ def testFencingOnAddPartitions(): Unit = {
+ val transactionalId = "my-t.id"
+ val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers)
+ val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers)
+ val consumer = transactionalConsumer()
+ consumer.subscribe(List(topic1, topic2))
+
+ try {
+ producer1.initTransactions()
+
+ producer1.beginTransaction()
+ producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
+ producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
+ producer1.abortTransaction()
+
+ producer2.initTransactions() // ok, will abort the open transaction.
+ producer2.beginTransaction()
+ producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true)).get()
+ producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true)).get()
+
+ try {
+ producer1.beginTransaction()
+ val result = producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "5", willBeCommitted = false))
+ val recordMetadata = result.get()
+ error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic()}-${recordMetadata.partition()}. Grab the logs!!")
+ servers.foreach { case (server) =>
+ error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}")
+ }
+ fail("Should not be able to send messages from a fenced producer.")
+ } catch {
+ case e : ProducerFencedException =>
+ producer1.close()
+ case e : ExecutionException =>
+ assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
+ producer1.close()
+ case e : Exception =>
+ fail("Got an unexpected exception from a fenced producer.", e)
+ }
+
+ producer2.commitTransaction() // ok
+
+ val records = pollUntilExactlyNumRecords(consumer, 2)
+ records.zipWithIndex.foreach { case (record, i) =>
+ TestUtils.assertCommittedAndGetValue(record)
+ }
+ } finally {
+ consumer.close()
+ producer2.close()
+ }
+ }
+
+ private def serverProps() = {
+ val serverProps = new Properties()
+ serverProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
+ // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic)
+ // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
+ serverProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
+ serverProps.put(KafkaConfig.TransactionsTopicPartitionsProp, 3.toString)
+ serverProps.put(KafkaConfig.TransactionsTopicReplicationFactorProp, 2.toString)
+ serverProps.put(KafkaConfig.TransactionsTopicMinISRProp, 2.toString)
+ serverProps.put(KafkaConfig.ControlledShutdownEnableProp, true.toString)
+ serverProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString)
+ serverProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+ serverProps
+ }
+
+ private def transactionalConsumer(group: String = "group", maxPollRecords: Int = 500) = {
+ val props = new Properties()
+ props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
+ TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers),
+ groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props))
+ }
+
+ private def nonTransactionalConsumer(group: String = "group") = {
+ val props = new Properties()
+ props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted")
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+ TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers),
+ groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props))
+ }
+
+ private def pollUntilExactlyNumRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) : Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
+ val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]()
+ TestUtils.waitUntilTrue(() => {
+ records ++= consumer.poll(50)
+ records.size == numRecords
+ }, s"Consumed ${records.size} records until timeout, but expected $numRecords records.")
+ records
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 6bee18d..90dcacd 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -20,7 +20,7 @@ package kafka.utils
import java.io._
import java.nio._
import java.nio.channels._
-import java.nio.charset.Charset
+import java.nio.charset.{Charset, StandardCharsets}
import java.security.cert.X509Certificate
import java.util.{ArrayList, Collections, Properties}
import java.util.concurrent.{Callable, Executors, TimeUnit}
@@ -40,9 +40,10 @@ import kafka.server._
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils.ZkUtils._
import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, RangeAssignor}
+import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, OffsetAndMetadata, RangeAssignor}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.header.Header
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.{ListenerName, Mode}
import org.apache.kafka.common.protocol.SecurityProtocol
@@ -55,8 +56,9 @@ import org.apache.zookeeper.ZooDefs._
import org.apache.zookeeper.data.ACL
import org.junit.Assert._
+import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
-import scala.collection.Map
+import scala.collection.{Map, mutable}
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.util.Try
@@ -75,6 +77,10 @@ object TestUtils extends Logging {
/** Zookeeper connection string to use for unit tests that mock/don't require a real ZK server. */
val MockZkConnect = "127.0.0.1:" + MockZkPort
+ private val transactionStatusKey = "transactionStatus"
+ private val committedValue : Array[Byte] = "committed".getBytes(StandardCharsets.UTF_8)
+ private val abortedValue : Array[Byte] = "aborted".getBytes(StandardCharsets.UTF_8)
+
/**
* Create a temporary directory
*/
@@ -1330,6 +1336,89 @@ object TestUtils extends Logging {
assertEquals("Consumed more records than expected", numMessages, records.size)
records
}
+
+ def createTransactionalProducer(transactionalId: String, servers: Seq[KafkaServer]) = {
+ val props = new Properties()
+ props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)
+ props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
+ props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
+ TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), retries = Integer.MAX_VALUE, acks = -1, props = Some(props))
+ }
+
+ // Seeds the given topic with records with keys and values in the range [0..numRecords)
+ def seedTopicWithNumberedRecords(topic: String, numRecords: Int, servers: Seq[KafkaServer]): Int = {
+ val props = new Properties()
+ props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
+ var recordsWritten = 0
+ val producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), retries = Integer.MAX_VALUE, acks = -1, props = Some(props))
+ try {
+ for (i <- 0 until numRecords) {
+ producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, asBytes(i.toString), asBytes(i.toString)))
+ recordsWritten += 1
+ }
+ producer.flush()
+ } finally {
+ producer.close()
+ }
+ recordsWritten
+ }
+
+ private def asString(bytes: Array[Byte]) = new String(bytes, StandardCharsets.UTF_8)
+
+ private def asBytes(string: String) = string.getBytes(StandardCharsets.UTF_8)
+
+ // Verifies that the record was intended to be committed by checking the the headers for an expected transaction status
+ // If true, this will return the value as a string. It is expected that the record in question should have been created
+ // by the `producerRecordWithExpectedTransactionStatus` method.
+ def assertCommittedAndGetValue(record: ConsumerRecord[Array[Byte], Array[Byte]]) : String = {
+ record.headers.headers(transactionStatusKey).headOption match {
+ case Some(header) =>
+ assertEquals(s"Got ${asString(header.value)} but expected the value to indicate " +
+ s"committed status.", asString(committedValue), asString(header.value))
+ case None =>
+ fail("expected the record header to include an expected transaction status, but received nothing.")
+ }
+ recordValueAsString(record)
+ }
+
+ def recordValueAsString(record: ConsumerRecord[Array[Byte], Array[Byte]]) : String = {
+ asString(record.value)
+ }
+
+ def producerRecordWithExpectedTransactionStatus(topic: String, key: Array[Byte], value: Array[Byte],
+ willBeCommitted: Boolean) : ProducerRecord[Array[Byte], Array[Byte]] = {
+ val header = new Header {override def key() = transactionStatusKey
+ override def value() = if (willBeCommitted)
+ committedValue
+ else
+ abortedValue
+ }
+ new ProducerRecord[Array[Byte], Array[Byte]](topic, null, key, value, List(header))
+ }
+
+ def producerRecordWithExpectedTransactionStatus(topic: String, key: String, value: String,
+ willBeCommitted: Boolean) : ProducerRecord[Array[Byte], Array[Byte]] = {
+ producerRecordWithExpectedTransactionStatus(topic, asBytes(key), asBytes(value), willBeCommitted)
+ }
+
+ // Collect the current positions for all partition in the consumers current assignment.
+ def consumerPositions(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) : Map[TopicPartition, OffsetAndMetadata] = {
+ val offsetsToCommit = new mutable.HashMap[TopicPartition, OffsetAndMetadata]()
+ consumer.assignment.foreach{ topicPartition =>
+ offsetsToCommit.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition)))
+ }
+ offsetsToCommit.toMap
+ }
+
+ def pollUntilAtLeastNumRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) : Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
+ val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]()
+ TestUtils.waitUntilTrue(() => {
+ records ++= consumer.poll(50)
+ records.size >= numRecords
+ }, s"Consumed ${records.size} records until timeout, but expected $numRecords records.")
+ records
+ }
+
}
class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {