You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/02/10 06:10:11 UTC

kafka git commit: KAFKA-3162: Added producer and consumer interceptors

Repository: kafka
Updated Branches:
  refs/heads/trunk fe3b7492b -> d5b43b19b


KAFKA-3162: Added producer and consumer interceptors

This is the most of the KIP-42: Producer and consumer interceptor. (Except exposing CRC and record sizes to the interceptor, which is coming as a separate PR; tracked by KAFKA-3196).

This PR includes:
1. Add ProducerInterceptor interface and call its callbacks from appropriate places in Kafka Producer.
2. Add ConsumerInterceptor interface and call its callbacks from appropriate places in Kafka Consumer.
3. Add unit tests for interceptor changes
4. Add integration test for both mutable consumer and producer interceptors.

Author: Anna Povzner <an...@confluent.io>

Reviewers: Jason Gustavson, Ismael Juma, Gwen Shapira

Closes #854 from apovzner/kip42


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d5b43b19
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d5b43b19
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d5b43b19

Branch: refs/heads/trunk
Commit: d5b43b19bb06e9cdc606312c8bcf87ed267daf44
Parents: fe3b749
Author: Anna Povzner <an...@confluent.io>
Authored: Tue Feb 9 22:10:06 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Feb 9 22:10:06 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerConfig.java  |  10 ++
 .../clients/consumer/ConsumerInterceptor.java   |  72 ++++++++
 .../kafka/clients/consumer/KafkaConsumer.java   |  15 +-
 .../consumer/internals/ConsumerCoordinator.java |  12 +-
 .../internals/ConsumerInterceptors.java         |  99 +++++++++++
 .../kafka/clients/producer/KafkaProducer.java   |  54 ++++++
 .../kafka/clients/producer/ProducerConfig.java  |  11 ++
 .../clients/producer/ProducerInterceptor.java   |  88 ++++++++++
 .../internals/ProducerInterceptors.java         | 106 +++++++++++
 .../kafka/common/config/AbstractConfig.java     |  10 ++
 .../clients/consumer/KafkaConsumerTest.java     |  24 +++
 .../internals/ConsumerCoordinatorTest.java      |   3 +-
 .../internals/ConsumerInterceptorsTest.java     | 174 +++++++++++++++++++
 .../clients/producer/KafkaProducerTest.java     |  25 +++
 .../internals/ProducerInterceptorsTest.java     | 147 ++++++++++++++++
 .../kafka/test/MockConsumerInterceptor.java     |  80 +++++++++
 .../kafka/test/MockProducerInterceptor.java     |  85 +++++++++
 .../kafka/api/BaseConsumerTest.scala            |  29 ++--
 .../kafka/api/PlaintextConsumerTest.scala       | 157 ++++++++++++++++-
 19 files changed, 1182 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index df192b9..3132cae 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -162,6 +162,11 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
     private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
 
+    /** <code>interceptor.classes</code> */
+    public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
+    public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to use as interceptors. "
+                                                        + "Implementing the <code>ConsumerInterceptor</code> interface allows you to intercept (and possibly mutate) records "
+                                                        + "received by the consumer. By default, there are no interceptors.";
 
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
@@ -296,6 +301,11 @@ public class ConsumerConfig extends AbstractConfig {
                                         9 * 60 * 1000,
                                         Importance.MEDIUM,
                                         CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
+                                .define(INTERCEPTOR_CLASSES_CONFIG,
+                                        Type.LIST,
+                                        null,
+                                        Importance.LOW,
+                                        INTERCEPTOR_CLASSES_DOC)
 
                                 // security support
                                 .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
new file mode 100644
index 0000000..5c13a41
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Map;
+
+/**
+ * A plugin interface that allows you to intercept (and possibly mutate) records received by the consumer. A primary use-case
+ * is for third-party components to hook into the consumer applications for custom monitoring, logging, etc.
+ *
+ * <p>
+ * This class will get consumer config properties via <code>configure()</code> method, including clientId assigned
+ * by KafkaConsumer if not specified in the consumer config. The interceptor implementation needs to be aware that it will be
+ * sharing consumer config namespace with other interceptors and serializers, and ensure that there are no conflicts.
+ * <p>
+ * Exceptions thrown by ConsumerInterceptor methods will be caught, logged, but not propagated further. As a result, if
+ * the user configures the interceptor with the wrong key and value type parameters, the consumer will not throw an exception,
+ * just log the errors.
+ * <p>
+ * ConsumerInterceptor callbacks are called from the same thread that invokes {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)}.
+ */
+public interface ConsumerInterceptor<K, V> extends Configurable {
+
+    /**
+     * This is called just before the records are returned by {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)}
+     * <p>
+     * This method is allowed to modify consumer records, in which case the new records will be returned.
+     * Any exception thrown by this method will be caught by the caller, logged, but not propagated to the client.
+     * <p>
+     * Since the consumer may run multiple interceptors, a particular interceptor's onConsume() callback will be called
+     * in the order specified by {@link org.apache.kafka.clients.consumer.ConsumerConfig#INTERCEPTOR_CLASSES_CONFIG}.
+     * The first interceptor in the list gets the consumed records, the following interceptor will be passed the records returned
+     * by the previous interceptor, and so on. Since interceptors are allowed to modify records, interceptors may potentially get
+     * the records already modified by other interceptors. However, building a pipeline of mutable interceptors that depend on the output
+     * of the previous interceptor is discouraged, because of potential side-effects caused by interceptors potentially failing
+     * to modify the record and throwing an exception. If one of the interceptors in the list throws an exception from onConsume(),
+     * the exception is caught, logged, and the next interceptor is called with the records returned by the last successful interceptor
+     * in the list, or otherwise the original consumed records.
+     *
+     * @param records records to be consumed by the client or records returned by the previous interceptors in the list.
+     * @return records that are either modified by the interceptor or same as records passed to this method.
+     */
+    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
+
+    /**
+     * This is called when offsets get committed.
+     * <p>
+     * Any exception thrown by this method will be ignored by the caller.
+     *
+     * @param offsets A map of offsets by partition with associated metadata
+     */
+    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
+
+    /**
+     * This is called when interceptor is closed
+     */
+    public void close();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index c14cc68..faa9a78 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.internals.Fetcher;
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
@@ -464,6 +465,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valueDeserializer;
     private final Fetcher<K, V> fetcher;
+    private final ConsumerInterceptors<K, V> interceptors;
 
     private final Time time;
     private final ConsumerNetworkClient client;
@@ -589,6 +591,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             List<PartitionAssignor> assignors = config.getConfiguredInstances(
                     ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                     PartitionAssignor.class);
+            // load interceptors and make sure they get clientId
+            Map<String, Object> userProvidedConfigs = config.originals();
+            userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+            List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
+                    ConsumerInterceptor.class);
+            this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList);
             this.coordinator = new ConsumerCoordinator(this.client,
                     config.getString(ConsumerConfig.GROUP_ID_CONFIG),
                     config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
@@ -602,7 +610,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     retryBackoffMs,
                     new ConsumerCoordinator.DefaultOffsetCommitCallback(),
                     config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
-                    config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
+                    config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
+                    this.interceptors);
             if (keyDeserializer == null) {
                 this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                         Deserializer.class);
@@ -860,7 +869,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     // auto-committing offsets
                     fetcher.initFetches(metadata.fetch());
                     client.quickPoll();
-                    return new ConsumerRecords<>(records);
+                    return this.interceptors == null
+                        ? new ConsumerRecords<>(records) : this.interceptors.onConsume(new ConsumerRecords<>(records));
                 }
 
                 long elapsed = time.milliseconds() - start;
@@ -1273,6 +1283,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         AtomicReference<Throwable> firstException = new AtomicReference<>();
         this.closed = true;
         ClientUtils.closeQuietly(coordinator, "coordinator", firstException);
+        ClientUtils.closeQuietly(interceptors, "consumer interceptors", firstException);
         ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
         ClientUtils.closeQuietly(client, "consumer network client", firstException);
         ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 41d2a27..aa39e11 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -68,6 +68,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     private final OffsetCommitCallback defaultOffsetCommitCallback;
     private final boolean autoCommitEnabled;
     private final AutoCommitTask autoCommitTask;
+    private final ConsumerInterceptors interceptors;
 
     /**
      * Initialize the coordination manager.
@@ -85,7 +86,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                                long retryBackoffMs,
                                OffsetCommitCallback defaultOffsetCommitCallback,
                                boolean autoCommitEnabled,
-                               long autoCommitIntervalMs) {
+                               long autoCommitIntervalMs,
+                               ConsumerInterceptors interceptors) {
         super(client,
                 groupId,
                 sessionTimeoutMs,
@@ -107,6 +109,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         this.autoCommitTask = autoCommitEnabled ? new AutoCommitTask(autoCommitIntervalMs) : null;
         this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
+        this.interceptors = interceptors;
     }
 
     @Override
@@ -326,6 +329,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         future.addListener(new RequestFutureListener<Void>() {
             @Override
             public void onSuccess(Void value) {
+                if (interceptors != null)
+                    interceptors.onCommit(offsets);
                 cb.onComplete(offsets, null);
             }
 
@@ -354,8 +359,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
             client.poll(future);
 
-            if (future.succeeded())
+            if (future.succeeded()) {
+                if (interceptors != null)
+                    interceptors.onCommit(offsets);
                 return;
+            }
 
             if (!future.isRetriable())
                 throw future.exception();

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java
new file mode 100644
index 0000000..f22686e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A container that holds the list {@link org.apache.kafka.clients.consumer.ConsumerInterceptor}
+ * and wraps calls to the chain of custom interceptors.
+ */
+public class ConsumerInterceptors<K, V> implements Closeable {
+    private static final Logger log = LoggerFactory.getLogger(ConsumerInterceptors.class);
+    private final List<ConsumerInterceptor<K, V>> interceptors;
+
+    public ConsumerInterceptors(List<ConsumerInterceptor<K, V>> interceptors) {
+        this.interceptors = interceptors;
+    }
+
+    /**
+     * This is called when the records are about to be returned to the user.
+     * <p>
+     * This method calls {@link ConsumerInterceptor#onConsume(ConsumerRecords)} for each
+     * interceptor. Records returned from each interceptor get passed to onConsume() of the next interceptor
+     * in the chain of interceptors.
+     * <p>
+     * This method does not throw exceptions. If any of the interceptors in the chain throws an exception,
+     * it gets caught and logged, and next interceptor in the chain is called with 'records' returned by the
+     * previous successful interceptor onConsume call.
+     *
+     * @param records records to be consumed by the client.
+     * @return records that are either modified by interceptors or same as records passed to this method.
+     */
+    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
+        ConsumerRecords<K, V> interceptRecords = records;
+        for (ConsumerInterceptor<K, V> interceptor : this.interceptors) {
+            try {
+                interceptRecords = interceptor.onConsume(interceptRecords);
+            } catch (Exception e) {
+                // do not propagate interceptor exception, log and continue calling other interceptors
+                log.warn("Error executing interceptor onConsume callback", e);
+            }
+        }
+        return interceptRecords;
+    }
+
+    /**
+     * This is called when commit request returns successfully from the broker.
+     * <p>
+     * This method calls {@link ConsumerInterceptor#onCommit(Map)} method for each interceptor.
+     * <p>
+     * This method does not throw exceptions. Exceptions thrown by any of the interceptors in the chain are logged, but not propagated.
+     *
+     * @param offsets A map of offsets by partition with associated metadata
+     */
+    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        for (ConsumerInterceptor<K, V> interceptor : this.interceptors) {
+            try {
+                interceptor.onCommit(offsets);
+            } catch (Exception e) {
+                // do not propagate interceptor exception, just log
+                log.warn("Error executing interceptor onCommit callback", e);
+            }
+        }
+    }
+
+    /**
+     * Closes every interceptor in a container.
+     */
+    @Override
+    public void close() {
+        for (ConsumerInterceptor<K, V> interceptor : this.interceptors) {
+            try {
+                interceptor.close();
+            } catch (Exception e) {
+                log.error("Failed to close consumer interceptor ", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3fc2a19..a76dc1a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.producer.internals.RecordAccumulator;
 import org.apache.kafka.clients.producer.internals.Sender;
+import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
@@ -60,6 +61,7 @@ import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * A Kafka client that publishes records to the Kafka cluster.
  * <P>
@@ -147,6 +149,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private final ProducerConfig producerConfig;
     private final long maxBlockTimeMs;
     private final int requestTimeoutMs;
+    private final ProducerInterceptors<K, V> interceptors;
 
     /**
      * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -313,6 +316,13 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                 this.valueSerializer = valueSerializer;
             }
+
+            // load interceptors and make sure they get clientId
+            userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
+            List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
+                    ProducerInterceptor.class);
+            this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
+
             config.logUnused();
             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
             log.debug("Kafka producer started");
@@ -410,6 +420,18 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      */
     @Override
     public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
+        // intercept the record, which can be potentially modified; this method does not throw exceptions
+        ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
+        // producer callback will make sure to call both 'callback' and interceptor callback
+        Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors);
+        return doSend(interceptedRecord, interceptCallback);
+    }
+
+    /**
+     * Implementation of asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
+     * See {@link #send(ProducerRecord, Callback)} for details.
+     */
+    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
         try {
             // first make sure the metadata for the topic is available
             long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
@@ -452,13 +474,24 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             return new FutureFailure(e);
         } catch (InterruptedException e) {
             this.errors.record();
+            if (this.interceptors != null)
+                this.interceptors.onAcknowledgement(null, e);
             throw new InterruptException(e);
         } catch (BufferExhaustedException e) {
             this.errors.record();
             this.metrics.sensor("buffer-exhausted-records").record();
+            if (this.interceptors != null)
+                this.interceptors.onAcknowledgement(null, e);
             throw e;
         } catch (KafkaException e) {
             this.errors.record();
+            if (this.interceptors != null)
+                this.interceptors.onAcknowledgement(null, e);
+            throw e;
+        } catch (Exception e) {
+            // we notify interceptor about all exceptions, since onSend is called before anything else in this method
+            if (this.interceptors != null)
+                this.interceptors.onAcknowledgement(null, e);
             throw e;
         }
     }
@@ -650,6 +683,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             }
         }
 
+        ClientUtils.closeQuietly(interceptors, "producer interceptors", firstException);
         ClientUtils.closeQuietly(metrics, "producer metrics", firstException);
         ClientUtils.closeQuietly(keySerializer, "producer keySerializer", firstException);
         ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", firstException);
@@ -716,4 +750,24 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
 
     }
 
+    /**
+     * A callback called when producer request is complete. It in turn calls user-supplied callback (if given) and
+     * notifies producer interceptors about the request completion.
+     */
+    private static class InterceptorCallback<K, V> implements Callback {
+        private final Callback userCallback;
+        private final ProducerInterceptors<K, V> interceptors;
+
+        public InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V> interceptors) {
+            this.userCallback = userCallback;
+            this.interceptors = interceptors;
+        }
+
+        public void onCompletion(RecordMetadata metadata, Exception exception) {
+            if (this.interceptors != null)
+                this.interceptors.onAcknowledgement(metadata, exception);
+            if (this.userCallback != null)
+                this.userCallback.onCompletion(metadata, exception);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index ae9aa08..ee2b142 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -200,6 +200,12 @@ public class ProducerConfig extends AbstractConfig {
     public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
     private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
 
+    /** <code>interceptor.classes</code> */
+    public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
+    public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to use as interceptors. "
+                                                        + "Implementing the <code>ProducerInterceptor</code> interface allows you to intercept (and possibly mutate) the records "
+                                                        + "received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.";
+
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
                                 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
@@ -277,6 +283,11 @@ public class ProducerConfig extends AbstractConfig {
                                         Type.CLASS,
                                         DefaultPartitioner.class.getName(),
                                         Importance.MEDIUM, PARTITIONER_CLASS_DOC)
+                                .define(INTERCEPTOR_CLASSES_CONFIG,
+                                        Type.LIST,
+                                        null,
+                                        Importance.LOW,
+                                        INTERCEPTOR_CLASSES_DOC)
 
                                 // security support
                                 .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
new file mode 100644
index 0000000..aa18fdc
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import org.apache.kafka.common.Configurable;
+
+/**
+ * A plugin interface that allows you to intercept (and possibly mutate) the records received by the producer before
+ * they are published to the Kafka cluster.
+ * <p>
+ * This class will get producer config properties via <code>configure()</code> method, including clientId assigned
+ * by KafkaProducer if not specified in the producer config. The interceptor implementation needs to be aware that it will be
+ * sharing producer config namespace with other interceptors and serializers, and ensure that there are no conflicts.
+ * <p>
+ * Exceptions thrown by ProducerInterceptor methods will be caught, logged, but not propagated further. As a result, if
+ * the user configures the interceptor with the wrong key and value type parameters, the producer will not throw an exception,
+ * just log the errors.
+ * <p>
+ * ProducerInterceptor callbacks may be called from multiple threads. Interceptor implementation must ensure thread-safety, if needed.
+ */
+public interface ProducerInterceptor<K, V> extends Configurable {
+    /**
+     * This is called from {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)} and
+     * {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord, Callback)} methods, before key and value
+     * get serialized and partition is assigned (if partition is not specified in ProducerRecord).
+     * <p>
+     * This method is allowed to modify the record, in which case, the new record will be returned. The implication of modifying
+     * key/value is that partition assignment (if not specified in ProducerRecord) will be done based on modified key/value,
+     * not key/value from the client. Consequently, key and value transformation done in onSend() needs to be consistent:
+     * same key and value should mutate to the same (modified) key and value. Otherwise, log compaction would not work
+     * as expected.
+     * <p>
+     * Similarly, it is up to interceptor implementation to ensure that correct topic/partition is returned in ProducerRecord.
+     * Most often, it should be the same topic/partition from 'record'.
+     * <p>
+     * Any exception thrown by this method will be caught by the caller and logged, but not propagated further.
+     * <p>
+     * Since the producer may run multiple interceptors, a particular interceptor's onSend() callback will be called in the order
+     * specified by {@link org.apache.kafka.clients.producer.ProducerConfig#INTERCEPTOR_CLASSES_CONFIG}. The first interceptor
+     * in the list gets the record passed from the client, the following interceptor will be passed the record returned by the
+     * previous interceptor, and so on. Since interceptors are allowed to modify records, interceptors may potentially get
+     * the record already modified by other interceptors. However, building a pipeline of mutable interceptors that depend on the output
+     * of the previous interceptor is discouraged, because of potential side-effects caused by interceptors potentially failing to
+     * modify the record and throwing an exception. If one of the interceptors in the list throws an exception from onSend(), the exception
+     * is caught, logged, and the next interceptor is called with the record returned by the last successful interceptor in the list,
+     * or otherwise the client.
+     *
+     * @param record the record from client or the record returned by the previous interceptor in the chain of interceptors.
+     * @return producer record to send to topic/partition
+     */
+    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
+
+    /**
+     * This method is called when the record sent to the server has been acknowledged, or when sending the record fails before
+     * it gets sent to the server.
+     * <p>
+     * This method is generally called just before the user callback is called, and in additional cases when <code>KafkaProducer.send()</code>
+     * throws an exception.
+     * <p>
+     * Any exception thrown by this method will be ignored by the caller.
+     * <p>
+     * This method will generally execute in the background I/O thread, so the implementation should be reasonably fast.
+     * Otherwise, sending of messages from other threads could be delayed.
+     *
+     * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error occurred.
+     * @param exception The exception thrown during processing of this record. Null if no error occurred.
+     */
+    public void onAcknowledgement(RecordMetadata metadata, Exception exception);
+
+    /**
+     * This is called when interceptor is closed
+     */
+    public void close();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
new file mode 100644
index 0000000..9343a2e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+
+import org.apache.kafka.clients.producer.ProducerInterceptor;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.List;
+
+/**
+ * A container that holds the list {@link org.apache.kafka.clients.producer.ProducerInterceptor}
+ * and wraps calls to the chain of custom interceptors.
+ */
+public class ProducerInterceptors<K, V> implements Closeable {
+    private static final Logger log = LoggerFactory.getLogger(ProducerInterceptors.class);
+    private final List<ProducerInterceptor<K, V>> interceptors;
+
+    public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
+        this.interceptors = interceptors;
+    }
+
+    /**
+     * This is called when client sends the record to KafkaProducer, before key and value gets serialized.
+     * The method calls {@link ProducerInterceptor#onSend(ProducerRecord)} method. ProducerRecord
+     * returned from the first interceptor's onSend() is passed to the second interceptor onSend(), and so on in the
+     * interceptor chain. The record returned from the last interceptor is returned from this method.
+     *
+     * This method does not throw exceptions. Exceptions thrown by any of interceptor methods are caught and ignored.
+     * If an interceptor in the middle of the chain, that normally modifies the record, throws an exception,
+     * the next interceptor in the chain will be called with a record returned by the previous interceptor that did not
+     * throw an exception.
+     *
+     * @param record the record from client
+     * @return producer record to send to topic/partition
+     */
+    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
+        ProducerRecord<K, V> interceptRecord = record;
+        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
+            try {
+                interceptRecord = interceptor.onSend(interceptRecord);
+            } catch (Exception e) {
+                // do not propagate interceptor exception, log and continue calling other interceptors
+                // be careful not to throw exception from here
+                if (record != null)
+                    log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
+                else
+                    log.warn("Error executing interceptor onSend callback", e);
+            }
+        }
+        return interceptRecord;
+    }
+
+    /**
+     * This method is called when the record sent to the server has been acknowledged, or when sending the record fails before
+     * it gets sent to the server. This method calls {@link ProducerInterceptor#onAcknowledgement(RecordMetadata, Exception)}
+     * method for each interceptor.
+     *
+     * This method does not throw exceptions. Exceptions thrown by any of interceptor methods are caught and ignored.
+     *
+     * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error occurred.
+     * @param exception The exception thrown during processing of this record. Null if no error occurred.
+     */
+    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
+        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
+            try {
+                interceptor.onAcknowledgement(metadata, exception);
+            } catch (Exception e) {
+                // do not propagate interceptor exceptions, just log
+                log.warn("Error executing interceptor onAcknowledgement callback", e);
+            }
+        }
+    }
+
+    /**
+     * Closes every interceptor in a container.
+     */
+    @Override
+    public void close() {
+        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
+            try {
+                interceptor.close();
+            } catch (Exception e) {
+                log.error("Failed to close producer interceptor ", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index f9b6cdf..b44f72c 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -199,9 +199,19 @@ public class AbstractConfig {
         return t.cast(o);
     }
 
+    /**
+     * Get a list of configured instances of the given class specified by the given configuration key. The configuration
+     * may specify either null or an empty string to indicate no configured instances. In both cases, this method
+     * returns an empty list to indicate no configured instances.
+     * @param key The configuration key for the class
+     * @param t The interface the class should implement
+     * @return The list of configured instances
+     */
     public <T> List<T> getConfiguredInstances(String key, Class<T> t) {
         List<String> klasses = getList(key);
         List<T> objects = new ArrayList<T>();
+        if (klasses == null)
+            return objects;
         for (String klass : klasses) {
             Object o;
             try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 5711852..c65fd73 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -19,6 +19,8 @@ package org.apache.kafka.clients.consumer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.test.MockConsumerInterceptor;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.junit.Assert;
 import org.junit.Test;
@@ -92,4 +94,26 @@ public class KafkaConsumerTest {
         consumer.assign(Arrays.asList(new TopicPartition("nonExistTopic", 0)));
         consumer.seek(new TopicPartition("nonExistTopic", 0), -1);
     }
+
+    @Test
+    public void testInterceptorConstructorClose() throws Exception {
+        try {
+            Properties props = new Properties();
+            // test with client ID assigned by KafkaConsumer
+            props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+            props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName());
+
+            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(
+                    props, new StringDeserializer(), new StringDeserializer());
+            Assert.assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
+            Assert.assertEquals(0, MockConsumerInterceptor.CLOSE_COUNT.get());
+
+            consumer.close();
+            Assert.assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
+            Assert.assertEquals(1, MockConsumerInterceptor.CLOSE_COUNT.get());
+        } finally {
+            // cleanup since we are using mutable static variables in MockConsumerInterceptor
+            MockConsumerInterceptor.resetCounters();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 3ae1a36..0b8a162 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -913,7 +913,8 @@ public class ConsumerCoordinatorTest {
                 retryBackoffMs,
                 defaultOffsetCommitCallback,
                 autoCommitEnabled,
-                autoCommitIntervalMs);
+                autoCommitIntervalMs,
+                null);
     }
 
     private Struct consumerMetadataResponse(Node node, short error) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
new file mode 100644
index 0000000..45210a8
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ConsumerInterceptorsTest {
+    private final int filterPartition1 = 5;
+    private final int filterPartition2 = 6;
+    private final String topic = "test";
+    private final int partition = 1;
+    private final TopicPartition tp = new TopicPartition(topic, partition);
+    private final TopicPartition filterTopicPart1 = new TopicPartition("test5", filterPartition1);
+    private final TopicPartition filterTopicPart2 = new TopicPartition("test6", filterPartition2);
+    private final ConsumerRecord<Integer, Integer> consumerRecord = new ConsumerRecord<>(topic, partition, 0, 1, 1);
+    private int onCommitCount = 0;
+    private int onConsumeCount = 0;
+
+    /**
+     * Test consumer interceptor that filters records in onConsume() intercept
+     */
+    private class FilterConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {
+        private int filterPartition;
+        private boolean throwExceptionOnConsume = false;
+        private boolean throwExceptionOnCommit = false;
+
+        FilterConsumerInterceptor(int filterPartition) {
+            this.filterPartition = filterPartition;
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+        }
+
+        @Override
+        public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
+            onConsumeCount++;
+            if (throwExceptionOnConsume)
+                throw new KafkaException("Injected exception in FilterConsumerInterceptor.onConsume.");
+
+            // filters out topic/partitions with partition == FILTER_PARTITION
+            Map<TopicPartition, List<ConsumerRecord<K, V>>> recordMap = new HashMap<>();
+            for (TopicPartition tp : records.partitions()) {
+                if (tp.partition() != filterPartition)
+                    recordMap.put(tp, records.records(tp));
+            }
+            return new ConsumerRecords<K, V>(recordMap);
+        }
+
+        @Override
+        public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
+            onCommitCount++;
+            if (throwExceptionOnCommit)
+                throw new KafkaException("Injected exception in FilterConsumerInterceptor.onCommit.");
+        }
+
+        @Override
+        public void close() {
+        }
+
+        // if 'on' is true, onConsume will always throw an exception
+        public void injectOnConsumeError(boolean on) {
+            throwExceptionOnConsume = on;
+        }
+
+        // if 'on' is true, onConsume will always throw an exception
+        public void injectOnCommitError(boolean on) {
+            throwExceptionOnCommit = on;
+        }
+    }
+
+    @Test
+    public void testOnConsumeChain() {
+        List<ConsumerInterceptor<Integer, Integer>>  interceptorList = new ArrayList<>();
+        // we are testing two different interceptors by configuring the same interceptor differently, which is not
+        // how it would be done in KafkaConsumer, but ok for testing interceptor callbacks
+        FilterConsumerInterceptor<Integer, Integer> interceptor1 = new FilterConsumerInterceptor<>(filterPartition1);
+        FilterConsumerInterceptor<Integer, Integer> interceptor2 = new FilterConsumerInterceptor<>(filterPartition2);
+        interceptorList.add(interceptor1);
+        interceptorList.add(interceptor2);
+        ConsumerInterceptors<Integer, Integer> interceptors = new ConsumerInterceptors<>(interceptorList);
+
+        // verify that onConsumer modifies ConsumerRecords
+        Map<TopicPartition, List<ConsumerRecord<Integer, Integer>>> records = new HashMap<>();
+        List<ConsumerRecord<Integer, Integer>> list1 = new ArrayList<>();
+        list1.add(consumerRecord);
+        List<ConsumerRecord<Integer, Integer>> list2 = new ArrayList<>();
+        list2.add(new ConsumerRecord<>(filterTopicPart1.topic(), filterTopicPart1.partition(), 0, 1, 1));
+        List<ConsumerRecord<Integer, Integer>> list3 = new ArrayList<>();
+        list3.add(new ConsumerRecord<>(filterTopicPart2.topic(), filterTopicPart2.partition(), 0, 1, 1));
+        records.put(tp, list1);
+        records.put(filterTopicPart1, list2);
+        records.put(filterTopicPart2, list3);
+        ConsumerRecords<Integer, Integer> consumerRecords = new ConsumerRecords<>(records);
+        ConsumerRecords<Integer, Integer> interceptedRecords = interceptors.onConsume(consumerRecords);
+        assertEquals(1, interceptedRecords.count());
+        assertTrue(interceptedRecords.partitions().contains(tp));
+        assertFalse(interceptedRecords.partitions().contains(filterTopicPart1));
+        assertFalse(interceptedRecords.partitions().contains(filterTopicPart2));
+        assertEquals(2, onConsumeCount);
+
+        // verify that even if one of the intermediate interceptors throws an exception, all interceptors' onConsume are called
+        interceptor1.injectOnConsumeError(true);
+        ConsumerRecords<Integer, Integer> partInterceptedRecs = interceptors.onConsume(consumerRecords);
+        assertEquals(2, partInterceptedRecs.count());
+        assertTrue(partInterceptedRecs.partitions().contains(filterTopicPart1));  // since interceptor1 threw exception
+        assertFalse(partInterceptedRecs.partitions().contains(filterTopicPart2)); // interceptor2 should still be called
+        assertEquals(4, onConsumeCount);
+
+        // if all interceptors throw an exception, records should be unmodified
+        interceptor2.injectOnConsumeError(true);
+        ConsumerRecords<Integer, Integer> noneInterceptedRecs = interceptors.onConsume(consumerRecords);
+        assertEquals(noneInterceptedRecs, consumerRecords);
+        assertEquals(3, noneInterceptedRecs.count());
+        assertEquals(6, onConsumeCount);
+
+        interceptors.close();
+    }
+
+    @Test
+    public void testOnCommitChain() {
+        List<ConsumerInterceptor<Integer, Integer>> interceptorList = new ArrayList<>();
+        // we are testing two different interceptors by configuring the same interceptor differently, which is not
+        // how it would be done in KafkaConsumer, but ok for testing interceptor callbacks
+        FilterConsumerInterceptor<Integer, Integer> interceptor1 = new FilterConsumerInterceptor<>(filterPartition1);
+        FilterConsumerInterceptor<Integer, Integer> interceptor2 = new FilterConsumerInterceptor<>(filterPartition2);
+        interceptorList.add(interceptor1);
+        interceptorList.add(interceptor2);
+        ConsumerInterceptors<Integer, Integer> interceptors = new ConsumerInterceptors<>(interceptorList);
+
+        // verify that onCommit is called for all interceptors in the chain
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(tp, new OffsetAndMetadata(0));
+        interceptors.onCommit(offsets);
+        assertEquals(2, onCommitCount);
+
+        // verify that even if one of the interceptors throws an exception, all interceptors' onCommit are called
+        interceptor1.injectOnCommitError(true);
+        interceptors.onCommit(offsets);
+        assertEquals(4, onCommitCount);
+
+        interceptors.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 1130225..2dada8c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -19,7 +19,9 @@ package org.apache.kafka.clients.producer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.test.MockMetricsReporter;
+import org.apache.kafka.test.MockProducerInterceptor;
 import org.apache.kafka.test.MockSerializer;
 import org.junit.Assert;
 import org.junit.Test;
@@ -70,4 +72,27 @@ public class KafkaProducerTest {
         Assert.assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get());
         Assert.assertEquals(oldCloseCount + 2, MockSerializer.CLOSE_COUNT.get());
     }
+
+    @Test
+    public void testInterceptorConstructClose() throws Exception {
+        try {
+            Properties props = new Properties();
+            // test with client ID assigned by KafkaProducer
+            props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+            props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName());
+            props.setProperty(MockProducerInterceptor.APPEND_STRING_PROP, "something");
+
+            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(
+                    props, new StringSerializer(), new StringSerializer());
+            Assert.assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
+            Assert.assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get());
+
+            producer.close();
+            Assert.assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
+            Assert.assertEquals(1, MockProducerInterceptor.CLOSE_COUNT.get());
+        } finally {
+            // cleanup since we are using mutable static variables in MockProducerInterceptor
+            MockProducerInterceptor.resetCounters();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
new file mode 100644
index 0000000..18a455f
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+
+import org.apache.kafka.clients.producer.ProducerInterceptor;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class ProducerInterceptorsTest {
+    private final TopicPartition tp = new TopicPartition("test", 0);
+    private final ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>("test", 0, 1, "value");
+    private int onAckCount = 0;
+    private int onSendCount = 0;
+
+    private class AppendProducerInterceptor implements ProducerInterceptor<Integer, String> {
+        private String appendStr = "";
+        private boolean throwExceptionOnSend = false;
+        private boolean throwExceptionOnAck = false;
+
+        public AppendProducerInterceptor(String appendStr) {
+            this.appendStr = appendStr;
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+        }
+
+        @Override
+        public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
+            onSendCount++;
+            if (throwExceptionOnSend)
+                throw new KafkaException("Injected exception in AppendProducerInterceptor.onSend");
+
+            ProducerRecord<Integer, String> newRecord = new ProducerRecord<>(
+                    record.topic(), record.partition(), record.key(), record.value().concat(appendStr));
+            return newRecord;
+        }
+
+        @Override
+        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
+            onAckCount++;
+            if (throwExceptionOnAck)
+                throw new KafkaException("Injected exception in AppendProducerInterceptor.onAcknowledgement");
+        }
+
+        @Override
+        public void close() {
+        }
+
+        // if 'on' is true, onSend will always throw an exception
+        public void injectOnSendError(boolean on) {
+            throwExceptionOnSend = on;
+        }
+
+        // if 'on' is true, onAcknowledgement will always throw an exception
+        public void injectOnAcknowledgementError(boolean on) {
+            throwExceptionOnAck = on;
+        }
+    }
+
+    @Test
+    public void testOnSendChain() {
+        List<ProducerInterceptor<Integer, String>> interceptorList = new ArrayList<>();
+        // we are testing two different interceptors by configuring the same interceptor differently, which is not
+        // how it would be done in KafkaProducer, but ok for testing interceptor callbacks
+        AppendProducerInterceptor interceptor1 = new AppendProducerInterceptor("One");
+        AppendProducerInterceptor interceptor2 = new AppendProducerInterceptor("Two");
+        interceptorList.add(interceptor1);
+        interceptorList.add(interceptor2);
+        ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList);
+
+        // verify that onSend() mutates the record as expected
+        ProducerRecord<Integer, String> interceptedRecord = interceptors.onSend(producerRecord);
+        assertEquals(2, onSendCount);
+        assertEquals(producerRecord.topic(), interceptedRecord.topic());
+        assertEquals(producerRecord.partition(), interceptedRecord.partition());
+        assertEquals(producerRecord.key(), interceptedRecord.key());
+        assertEquals(interceptedRecord.value(), producerRecord.value().concat("One").concat("Two"));
+
+        // onSend() mutates the same record the same way
+        ProducerRecord<Integer, String> anotherRecord = interceptors.onSend(producerRecord);
+        assertEquals(4, onSendCount);
+        assertEquals(interceptedRecord, anotherRecord);
+
+        // verify that if one of the interceptors throws an exception, other interceptors' callbacks are still called
+        interceptor1.injectOnSendError(true);
+        ProducerRecord<Integer, String> partInterceptRecord = interceptors.onSend(producerRecord);
+        assertEquals(6, onSendCount);
+        assertEquals(partInterceptRecord.value(), producerRecord.value().concat("Two"));
+
+        // verify the record remains valid if all onSend throws an exception
+        interceptor2.injectOnSendError(true);
+        ProducerRecord<Integer, String> noInterceptRecord = interceptors.onSend(producerRecord);
+        assertEquals(producerRecord, noInterceptRecord);
+
+        interceptors.close();
+    }
+
+    @Test
+    public void testOnAcknowledgementChain() {
+        List<ProducerInterceptor<Integer, String>> interceptorList = new ArrayList<>();
+        // we are testing two different interceptors by configuring the same interceptor differently, which is not
+        // how it would be done in KafkaProducer, but ok for testing interceptor callbacks
+        AppendProducerInterceptor interceptor1 = new AppendProducerInterceptor("One");
+        AppendProducerInterceptor interceptor2 = new AppendProducerInterceptor("Two");
+        interceptorList.add(interceptor1);
+        interceptorList.add(interceptor2);
+        ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList);
+
+        // verify onAck is called on all interceptors
+        RecordMetadata meta = new RecordMetadata(tp, 0, 0);
+        interceptors.onAcknowledgement(meta, null);
+        assertEquals(2, onAckCount);
+
+        // verify that onAcknowledgement exceptions do not propagate
+        interceptor1.injectOnAcknowledgementError(true);
+        interceptors.onAcknowledgement(meta, null);
+        assertEquals(4, onAckCount);
+
+        interceptor2.injectOnAcknowledgementError(true);
+        interceptors.onAcknowledgement(meta, null);
+        assertEquals(6, onAckCount);
+
+        interceptors.close();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
new file mode 100644
index 0000000..8295b54
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MockConsumerInterceptor implements ConsumerInterceptor<String, String> {
+    public static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
+    public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
+    public static final AtomicInteger ON_COMMIT_COUNT = new AtomicInteger(0);
+
+    public MockConsumerInterceptor() {
+        INIT_COUNT.incrementAndGet();
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        // clientId must be in configs
+        Object clientIdValue = configs.get(ConsumerConfig.CLIENT_ID_CONFIG);
+        if (clientIdValue == null)
+            throw new ConfigException("Mock consumer interceptor expects configuration " + ProducerConfig.CLIENT_ID_CONFIG);
+    }
+
+    @Override
+    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
+        Map<TopicPartition, List<ConsumerRecord<String, String>>> recordMap = new HashMap<>();
+        for (TopicPartition tp : records.partitions()) {
+            List<ConsumerRecord<String, String>> lst = new ArrayList<>();
+            for (ConsumerRecord<String, String> record: records.records(tp)) {
+                lst.add(new ConsumerRecord<>(record.topic(), record.partition(), record.offset(), record.key(), record.value().toUpperCase()));
+            }
+            recordMap.put(tp, lst);
+        }
+        return new ConsumerRecords<String, String>(recordMap);
+    }
+
+    @Override
+    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        ON_COMMIT_COUNT.incrementAndGet();
+    }
+
+    @Override
+    public void close() {
+        CLOSE_COUNT.incrementAndGet();
+    }
+
+    public static void resetCounters() {
+        INIT_COUNT.set(0);
+        CLOSE_COUNT.set(0);
+        ON_COMMIT_COUNT.set(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
new file mode 100644
index 0000000..cee1247
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerInterceptor;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.ConfigException;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+public class MockProducerInterceptor implements ProducerInterceptor<String, String> {
+    public static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
+    public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
+    public static final AtomicInteger ONSEND_COUNT = new AtomicInteger(0);
+    public static final AtomicInteger ON_SUCCESS_COUNT = new AtomicInteger(0);
+    public static final AtomicInteger ON_ERROR_COUNT = new AtomicInteger(0);
+    public static final String APPEND_STRING_PROP = "mock.interceptor.append";
+    private String appendStr;
+
+    public MockProducerInterceptor() {
+        INIT_COUNT.incrementAndGet();
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        // ensure this method is called and expected configs are passed in
+        Object o = configs.get(APPEND_STRING_PROP);
+        if (o == null)
+            throw new ConfigException("Mock producer interceptor expects configuration " + APPEND_STRING_PROP);
+        if (o != null && o instanceof String)
+            appendStr = (String) o;
+
+        // clientId also must be in configs
+        Object clientIdValue = configs.get(ProducerConfig.CLIENT_ID_CONFIG);
+        if (clientIdValue == null)
+            throw new ConfigException("Mock producer interceptor expects configuration " + ProducerConfig.CLIENT_ID_CONFIG);
+    }
+
+    @Override
+    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
+        ONSEND_COUNT.incrementAndGet();
+        ProducerRecord<String, String> newRecord = new ProducerRecord<>(
+                record.topic(), record.partition(), record.key(), record.value().concat(appendStr));
+        return newRecord;
+    }
+
+    @Override
+    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
+        if (exception != null)
+            ON_ERROR_COUNT.incrementAndGet();
+        else if (metadata != null)
+            ON_SUCCESS_COUNT.incrementAndGet();
+    }
+
+    @Override
+    public void close() {
+        CLOSE_COUNT.incrementAndGet();
+    }
+
+    public static void resetCounters() {
+        INIT_COUNT.set(0);
+        CLOSE_COUNT.set(0);
+        ONSEND_COUNT.set(0);
+        ON_SUCCESS_COUNT.set(0);
+        ON_ERROR_COUNT.set(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index eb24706..bc3a6ce 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -290,16 +290,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
 
   protected def consumeAndVerifyRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int, startingOffset: Int,
                                       startingKeyAndValueIndex: Int = 0, tp: TopicPartition = tp) {
-    val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
-    val maxIters = numRecords * 300
-    var iters = 0
-    while (records.size < numRecords) {
-      for (record <- consumer.poll(50).asScala)
-        records.add(record)
-      if (iters > maxIters)
-        throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.")
-      iters += 1
-    }
+    val records = consumeRecords(consumer, numRecords)
     for (i <- 0 until numRecords) {
       val record = records.get(i)
       val offset = startingOffset + i
@@ -312,11 +303,25 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     }
   }
 
-  protected def awaitCommitCallback(consumer: Consumer[Array[Byte], Array[Byte]], commitCallback: CountConsumerCommitCallback): Unit = {
+  protected def consumeRecords[K, V](consumer: Consumer[K, V], numRecords: Int): ArrayList[ConsumerRecord[K, V]] = {
+    val records = new ArrayList[ConsumerRecord[K, V]]
+    val maxIters = numRecords * 300
+    var iters = 0
+    while (records.size < numRecords) {
+      for (record <- consumer.poll(50).asScala)
+        records.add(record)
+      if (iters > maxIters)
+        throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.")
+      iters += 1
+    }
+    records
+  }
+
+  protected def awaitCommitCallback[K, V](consumer: Consumer[K, V], commitCallback: CountConsumerCommitCallback): Unit = {
     val startCount = commitCallback.count
     val started = System.currentTimeMillis()
     while (commitCallback.count == startCount && System.currentTimeMillis() - started < 10000)
-      this.consumers(0).poll(50)
+      consumer.poll(50)
     assertEquals(startCount + 1, commitCallback.count)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 6711edf..b2f96e5 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -12,17 +12,21 @@
   */
 package kafka.api
 
+
+import java.util
 import java.util.Properties
+
 import java.util.regex.Pattern
 
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.CompressionType
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer, ByteArrayDeserializer, ByteArraySerializer}
 import org.apache.kafka.common.errors.{InvalidTopicException, RecordTooLargeException}
+import org.apache.kafka.test.{MockProducerInterceptor, MockConsumerInterceptor}
 import org.junit.Assert._
 import org.junit.Test
 import scala.collection.mutable.Buffer
@@ -522,6 +526,144 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     runMultiConsumerSessionTimeoutTest(true)
   }
 
+  @Test
+  def testInterceptors() {
+    val appendStr = "mock"
+    // create producer with interceptor
+    val producerProps = new Properties()
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockProducerInterceptor")
+    producerProps.put("mock.interceptor.append", appendStr)
+    val testProducer = new KafkaProducer[String,String](producerProps, new StringSerializer, new StringSerializer)
+
+    // produce records
+    val numRecords = 10
+    (0 until numRecords).map { i =>
+      testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key $i", s"value $i"))
+    }.foreach(_.get)
+    assertEquals(numRecords, MockProducerInterceptor.ONSEND_COUNT.intValue())
+    assertEquals(numRecords, MockProducerInterceptor.ON_SUCCESS_COUNT.intValue())
+    // send invalid record
+    try {
+      testProducer.send(null, null)
+      fail("Should not allow sending a null record")
+    } catch {
+      case e: Throwable => assertEquals("Interceptor should be notified about exception", 1, MockProducerInterceptor.ON_ERROR_COUNT.intValue()) // this is ok
+    }
+
+    // create consumer with interceptor
+    this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockConsumerInterceptor")
+    val testConsumer = new KafkaConsumer[String, String](this.consumerConfig, new StringDeserializer(), new StringDeserializer())
+    testConsumer.assign(List(tp).asJava)
+    testConsumer.seek(tp, 0)
+
+    // consume and verify that values are modified by interceptors
+    val records = consumeRecords(testConsumer, numRecords)
+    for (i <- 0 until numRecords) {
+      val record = records.get(i)
+      assertEquals(s"key $i", new String(record.key()))
+      assertEquals(s"value $i$appendStr".toUpperCase, new String(record.value()))
+    }
+
+    // commit sync and verify onCommit is called
+    val commitCountBefore = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()
+    testConsumer.commitSync(Map[TopicPartition,OffsetAndMetadata]((tp, new OffsetAndMetadata(2L))).asJava)
+    assertEquals(2, testConsumer.committed(tp).offset)
+    assertEquals(commitCountBefore+1, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue())
+
+    // commit async and verify onCommit is called
+    val commitCallback = new CountConsumerCommitCallback()
+    testConsumer.commitAsync(Map[TopicPartition,OffsetAndMetadata]((tp, new OffsetAndMetadata(5L))).asJava, commitCallback)
+    awaitCommitCallback(testConsumer, commitCallback)
+    assertEquals(5, testConsumer.committed(tp).offset)
+    assertEquals(commitCountBefore+2, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue())
+
+    testConsumer.close()
+    testProducer.close()
+
+    // cleanup
+    MockConsumerInterceptor.resetCounters()
+    MockProducerInterceptor.resetCounters()
+  }
+
+  @Test
+  def testAutoCommitIntercept() {
+    val topic2 = "topic2"
+    TestUtils.createTopic(this.zkUtils, topic2, 2, serverCount, this.servers)
+
+    // produce records
+    val numRecords = 100
+    val testProducer = new KafkaProducer[String,String](this.producerConfig, new StringSerializer, new StringSerializer)
+    (0 until numRecords).map { i =>
+      testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key $i", s"value $i"))
+    }.foreach(_.get)
+
+    // create consumer with interceptor
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+    this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockConsumerInterceptor")
+    val testConsumer = new KafkaConsumer[String, String](this.consumerConfig, new StringDeserializer(), new StringDeserializer())
+    val rebalanceListener = new ConsumerRebalanceListener {
+      override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) = {
+        // keep partitions paused in this test so that we can verify the commits based on specific seeks
+        partitions.asScala.foreach(testConsumer.pause(_))
+      }
+
+      override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) = {}
+    }
+    changeConsumerSubscriptionAndValidateAssignment(testConsumer, List(topic), Set(tp, tp2), rebalanceListener)
+    testConsumer.seek(tp, 10)
+    testConsumer.seek(tp2, 20)
+
+    // change subscription to trigger rebalance
+    val commitCountBeforeRebalance = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()
+    changeConsumerSubscriptionAndValidateAssignment(testConsumer,
+                                                    List(topic, topic2), Set(tp, tp2, new TopicPartition(topic2, 0),
+                                                    new TopicPartition(topic2, 1)),
+                                                    rebalanceListener)
+
+    // after rebalancing, we should have reset to the committed positions
+    assertEquals(10, testConsumer.committed(tp).offset)
+    assertEquals(20, testConsumer.committed(tp2).offset)
+    assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > commitCountBeforeRebalance)
+
+    // verify commits are intercepted on close
+    val commitCountBeforeClose = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()
+    testConsumer.close()
+    assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > commitCountBeforeClose)
+    testProducer.close()
+
+    // cleanup
+    MockConsumerInterceptor.resetCounters()
+  }
+
+  @Test
+  def testInterceptorsWithWrongKeyValue() {
+    val appendStr = "mock"
+    // create producer with interceptor that has different key and value types from the producer
+    val producerProps = new Properties()
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockProducerInterceptor")
+    producerProps.put("mock.interceptor.append", appendStr)
+    val testProducer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps, new ByteArraySerializer(), new ByteArraySerializer())
+
+    // producing records should succeed
+    testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key".getBytes, s"value will not be modified".getBytes))
+
+    // create consumer with interceptor that has different key and value types from the consumer
+    this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockConsumerInterceptor")
+    val testConsumer = new KafkaConsumer[Array[Byte],Array[Byte]](this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    testConsumer.assign(List(tp).asJava)
+    testConsumer.seek(tp, 0)
+
+    // consume and verify that values are not modified by interceptors -- their exceptions are caught and logged, but not propagated
+    val records = consumeRecords(testConsumer, 1)
+    val record = records.get(0)
+    assertEquals(s"value will not be modified", new String(record.value()))
+
+    testConsumer.close()
+    testProducer.close()
+  }
+
   def runMultiConsumerSessionTimeoutTest(closeConsumer: Boolean): Unit = {
     // use consumers defined in this class plus one additional consumer
     // Use topic defined in this class + one additional topic
@@ -705,4 +847,15 @@ class PlaintextConsumerTest extends BaseConsumerTest {
         s"Did not get valid assignment for partitions ${subscriptions.asJava} after we changed subscription")
   }
 
+  def changeConsumerSubscriptionAndValidateAssignment[K, V](consumer: Consumer[K, V],
+                                                            topicsToSubscribe: List[String],
+                                                            subscriptions: Set[TopicPartition],
+                                                            rebalanceListener: ConsumerRebalanceListener): Unit = {
+    consumer.subscribe(topicsToSubscribe.asJava, rebalanceListener)
+    TestUtils.waitUntilTrue(() => {
+      val records = consumer.poll(50)
+      consumer.assignment() == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${consumer.assignment()}")
+  }
+
 }