You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2015/01/30 03:39:34 UTC
[4/7] kafka git commit: KAFKA-1760: New consumer.
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index fa88ac1..f50da82 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -9,185 +9,174 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
-*/
+ */
package org.apache.kafka.clients.consumer;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.MetricName;
/**
- * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka.
- * This class is <i> not threadsafe </i>
+ * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is <i> not
+ * threadsafe </i>
* <p>
- * The consumer runs in the user thread and multiplexes I/O over TCP connections to each of the brokers it
- * needs to communicate with. Failure to close the consumer after use will leak these resources.
+ * The consumer runs in the user thread and multiplexes I/O over TCP connections to each of the brokers it needs to
+ * communicate with. Failure to close the consumer after use will leak these resources.
*/
-public class MockConsumer implements Consumer<byte[], byte[]> {
+public class MockConsumer<K, V> implements Consumer<K, V> {
+
+ private final Map<String, List<PartitionInfo>> partitions;
+ private final SubscriptionState subscriptions;
+ private Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+ private boolean closed;
- private final Set<TopicPartition> subscribedPartitions;
- private final Set<String> subscribedTopics;
- private final Map<TopicPartition, Long> committedOffsets;
- private final Map<TopicPartition, Long> consumedOffsets;
-
public MockConsumer() {
- subscribedPartitions = new HashSet<TopicPartition>();
- subscribedTopics = new HashSet<String>();
- committedOffsets = new HashMap<TopicPartition, Long>();
- consumedOffsets = new HashMap<TopicPartition, Long>();
+ this.subscriptions = new SubscriptionState();
+ this.partitions = new HashMap<String, List<PartitionInfo>>();
+ this.records = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
+ this.closed = false;
}
@Override
- public void subscribe(String... topics) {
- if(subscribedPartitions.size() > 0)
- throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive");
- for(String topic : topics) {
- subscribedTopics.add(topic);
- }
+ public synchronized Set<TopicPartition> subscriptions() {
+ return this.subscriptions.assignedPartitions();
}
@Override
- public void subscribe(TopicPartition... partitions) {
- if(subscribedTopics.size() > 0)
- throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive");
- for(TopicPartition partition : partitions) {
- subscribedPartitions.add(partition);
- consumedOffsets.put(partition, 0L);
- }
+ public synchronized void subscribe(String... topics) {
+ ensureNotClosed();
+ for (String topic : topics)
+ this.subscriptions.subscribe(topic);
}
- public void unsubscribe(String... topics) {
- // throw an exception if the topic was never subscribed to
- for(String topic:topics) {
- if(!subscribedTopics.contains(topic))
- throw new IllegalStateException("Topic " + topic + " was never subscribed to. subscribe(" + topic + ") should be called prior" +
- " to unsubscribe(" + topic + ")");
- subscribedTopics.remove(topic);
- }
+ @Override
+ public synchronized void subscribe(TopicPartition... partitions) {
+ ensureNotClosed();
+ for (TopicPartition partition : partitions)
+ this.subscriptions.subscribe(partition);
}
- public void unsubscribe(TopicPartition... partitions) {
- // throw an exception if the partition was never subscribed to
- for(TopicPartition partition:partitions) {
- if(!subscribedPartitions.contains(partition))
- throw new IllegalStateException("Partition " + partition + " was never subscribed to. subscribe(new TopicPartition(" +
- partition.topic() + "," + partition.partition() + ") should be called prior" +
- " to unsubscribe(new TopicPartition(" + partition.topic() + "," + partition.partition() + ")");
- subscribedPartitions.remove(partition);
- committedOffsets.remove(partition);
- consumedOffsets.remove(partition);
- }
+ public synchronized void unsubscribe(String... topics) {
+ ensureNotClosed();
+ for (String topic : topics)
+ this.subscriptions.unsubscribe(topic);
+ }
+
+ public synchronized void unsubscribe(TopicPartition... partitions) {
+ ensureNotClosed();
+ for (TopicPartition partition : partitions)
+ this.subscriptions.unsubscribe(partition);
}
@Override
- public Map<String, ConsumerRecords<byte[], byte[]>> poll(long timeout) {
- // hand out one dummy record, 1 per topic
- Map<String, List<ConsumerRecord>> records = new HashMap<String, List<ConsumerRecord>>();
- Map<String, ConsumerRecords<byte[], byte[]>> recordMetadata = new HashMap<String, ConsumerRecords<byte[], byte[]>>();
- for(TopicPartition partition : subscribedPartitions) {
- // get the last consumed offset
- long messageSequence = consumedOffsets.get(partition);
- ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
- ObjectOutputStream outputStream;
- try {
- outputStream = new ObjectOutputStream(byteStream);
- outputStream.writeLong(messageSequence++);
- outputStream.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- List<ConsumerRecord> recordsForTopic = records.get(partition.topic());
- if(recordsForTopic == null) {
- recordsForTopic = new ArrayList<ConsumerRecord>();
- records.put(partition.topic(), recordsForTopic);
- }
- recordsForTopic.add(new ConsumerRecord(partition.topic(), partition.partition(), null, byteStream.toByteArray(), messageSequence));
- consumedOffsets.put(partition, messageSequence);
+ public synchronized ConsumerRecords<K, V> poll(long timeout) {
+ ensureNotClosed();
+ // update the consumed offset
+ for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : this.records.entrySet()) {
+ List<ConsumerRecord<K, V>> recs = entry.getValue();
+ if (!recs.isEmpty())
+ this.subscriptions.consumed(entry.getKey(), recs.get(recs.size() - 1).offset());
}
- for(Entry<String, List<ConsumerRecord>> recordsPerTopic : records.entrySet()) {
- Map<Integer, List<ConsumerRecord>> recordsPerPartition = new HashMap<Integer, List<ConsumerRecord>>();
- for(ConsumerRecord record : recordsPerTopic.getValue()) {
- List<ConsumerRecord> recordsForThisPartition = recordsPerPartition.get(record.partition());
- if(recordsForThisPartition == null) {
- recordsForThisPartition = new ArrayList<ConsumerRecord>();
- recordsPerPartition.put(record.partition(), recordsForThisPartition);
- }
- recordsForThisPartition.add(record);
- }
- recordMetadata.put(recordsPerTopic.getKey(), new ConsumerRecords(recordsPerTopic.getKey(), recordsPerPartition));
+
+ ConsumerRecords<K, V> copy = new ConsumerRecords<K, V>(this.records);
+ this.records = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
+ return copy;
+ }
+
+ public synchronized void addRecord(ConsumerRecord<K, V> record) {
+ ensureNotClosed();
+ TopicPartition tp = new TopicPartition(record.topic(), record.partition());
+ this.subscriptions.assignedPartitions().add(tp);
+ List<ConsumerRecord<K, V>> recs = this.records.get(tp);
+ if (recs == null) {
+ recs = new ArrayList<ConsumerRecord<K, V>>();
+ this.records.put(tp, recs);
}
- return recordMetadata;
+ recs.add(record);
}
@Override
- public OffsetMetadata commit(Map<TopicPartition, Long> offsets, boolean sync) {
- if(!sync)
- return null;
- for(Entry<TopicPartition, Long> partitionOffset : offsets.entrySet()) {
- committedOffsets.put(partitionOffset.getKey(), partitionOffset.getValue());
- }
- return new OffsetMetadata(committedOffsets, null);
+ public synchronized void commit(Map<TopicPartition, Long> offsets, CommitType commitType) {
+ ensureNotClosed();
+ for (Entry<TopicPartition, Long> entry : offsets.entrySet())
+ subscriptions.committed(entry.getKey(), entry.getValue());
}
@Override
- public OffsetMetadata commit(boolean sync) {
- if(!sync)
- return null;
- return commit(consumedOffsets, sync);
+ public synchronized void commit(CommitType commitType) {
+ ensureNotClosed();
+ commit(this.subscriptions.allConsumed(), commitType);
}
@Override
- public void seek(Map<TopicPartition, Long> offsets) {
- // change the fetch offsets
- for(Entry<TopicPartition, Long> partitionOffset : offsets.entrySet()) {
- consumedOffsets.put(partitionOffset.getKey(), partitionOffset.getValue());
- }
+ public synchronized void seek(TopicPartition partition, long offset) {
+ ensureNotClosed();
+ subscriptions.seek(partition, offset);
}
@Override
- public Map<TopicPartition, Long> committed(Collection<TopicPartition> partitions) {
- Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
- for(TopicPartition partition : partitions) {
- offsets.put(new TopicPartition(partition.topic(), partition.partition()), committedOffsets.get(partition));
- }
- return offsets;
+ public synchronized long committed(TopicPartition partition) {
+ ensureNotClosed();
+ return subscriptions.committed(partition);
}
@Override
- public Map<TopicPartition, Long> position(Collection<TopicPartition> partitions) {
- Map<TopicPartition, Long> positions = new HashMap<TopicPartition, Long>();
- for(TopicPartition partition : partitions) {
- positions.put(partition, consumedOffsets.get(partition));
- }
- return positions;
+ public synchronized long position(TopicPartition partition) {
+ ensureNotClosed();
+ return subscriptions.consumed(partition);
+ }
+
+ @Override
+ public synchronized void seekToBeginning(TopicPartition... partitions) {
+ ensureNotClosed();
+ throw new UnsupportedOperationException();
}
@Override
- public Map<TopicPartition, Long> offsetsBeforeTime(long timestamp,
- Collection<TopicPartition> partitions) {
+ public synchronized void seekToEnd(TopicPartition... partitions) {
+ ensureNotClosed();
throw new UnsupportedOperationException();
}
@Override
public Map<MetricName, ? extends Metric> metrics() {
- return null;
+ ensureNotClosed();
+ return Collections.emptyMap();
}
@Override
- public void close() {
- // unsubscribe from all partitions
- TopicPartition[] allPartitions = new TopicPartition[subscribedPartitions.size()];
- unsubscribe(subscribedPartitions.toArray(allPartitions));
+ public synchronized List<PartitionInfo> partitionsFor(String topic) {
+ ensureNotClosed();
+ List<PartitionInfo> parts = this.partitions.get(topic);
+ if (parts == null)
+ return Collections.emptyList();
+ else
+ return parts;
+ }
+
+ public synchronized void updatePartitions(String topic, List<PartitionInfo> partitions) {
+ ensureNotClosed();
+ this.partitions.put(topic, partitions);
+ }
+
+ @Override
+ public synchronized void close() {
+ ensureNotClosed();
+ this.closed = true;
+ }
+
+ private void ensureNotClosed() {
+ if (this.closed)
+ throw new IllegalStateException("This consumer has already been closed.");
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
new file mode 100644
index 0000000..a21f97b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ * Indicates that there is no stored offset and no defined offset reset policy
+ */
+public class NoOffsetForPartitionException extends KafkaException {
+
+ private static final long serialVersionUID = 1L;
+
+ public NoOffsetForPartitionException(String message) {
+ super(message);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java
deleted file mode 100644
index ea423ad..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer;
-
-import java.util.Map;
-
-import org.apache.kafka.common.TopicPartition;
-
-/**
- * The metadata for an offset commit that has been acknowledged by the server
- */
-public final class OffsetMetadata {
-
- private final Map<TopicPartition, Long> offsets;
- private final Map<TopicPartition, RuntimeException> errors;
-
- public OffsetMetadata(Map<TopicPartition, Long> offsets, Map<TopicPartition, RuntimeException> errors) {
- super();
- this.offsets = offsets;
- this.errors = errors;
- }
-
- public OffsetMetadata(Map<TopicPartition, Long> offsets) {
- this(offsets, null);
- }
-
- /**
- * The offset of the record in the topic/partition.
- */
- public long offset(TopicPartition partition) {
- if(this.errors != null)
- throw errors.get(partition);
- return offsets.get(partition);
- }
-
- /**
- * @return The exception corresponding to the error code returned by the server
- */
- public Exception error(TopicPartition partition) {
- if(errors != null)
- return errors.get(partition);
- else
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
new file mode 100644
index 0000000..d9483ec
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
@@ -0,0 +1,47 @@
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * A helper class for managing the heartbeat to the co-ordinator
+ */
+public final class Heartbeat {
+
+ /* The number of heartbeats to attempt to complete per session timeout interval.
+ * so, e.g., with a session timeout of 3 seconds we would attempt a heartbeat
+ * once per second.
+ */
+ private final static int HEARTBEATS_PER_SESSION_INTERVAL = 3;
+
+ private final long timeout;
+ private long lastHeartbeatSend;
+ private long lastHeartbeatResponse;
+
+ public Heartbeat(long timeout, long now) {
+ this.timeout = timeout;
+ this.lastHeartbeatSend = now;
+ this.lastHeartbeatResponse = now;
+ }
+
+ public void sentHeartbeat(long now) {
+ this.lastHeartbeatSend = now;
+ }
+
+ public void receivedResponse(long now) {
+ this.lastHeartbeatResponse = now;
+ }
+
+ public void markDead() {
+ this.lastHeartbeatResponse = -1;
+ }
+
+ public boolean isAlive(long now) {
+ return now - lastHeartbeatResponse <= timeout;
+ }
+
+ public boolean shouldHeartbeat(long now) {
+ return now - lastHeartbeatSend > (1.0 / HEARTBEATS_PER_SESSION_INTERVAL) * this.timeout;
+ }
+
+ public long lastHeartbeatSend() {
+ return this.lastHeartbeatSend;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
new file mode 100644
index 0000000..7e57a39
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.Collection;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback;
+import org.apache.kafka.common.TopicPartition;
+
+public class NoOpConsumerRebalanceCallback implements ConsumerRebalanceCallback {
+
+ @Override
+ public void onPartitionsAssigned(Consumer<?,?> consumer, Collection<TopicPartition> partitions) {}
+
+ @Override
+ public void onPartitionsRevoked(Consumer<?,?> consumer, Collection<TopicPartition> partitions) {}
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
new file mode 100644
index 0000000..71ce20d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -0,0 +1,166 @@
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * A class for tracking the topics, partitions, and offsets for the consumer
+ */
+public class SubscriptionState {
+
+ /* the list of topics the user has requested */
+ private final Set<String> subscribedTopics;
+
+ /* the list of partitions the user has requested */
+ private final Set<TopicPartition> subscribedPartitions;
+
+ /* the list of partitions currently assigned */
+ private final Set<TopicPartition> assignedPartitions;
+
+ /* the offset exposed to the user */
+ private final Map<TopicPartition, Long> consumed;
+
+ /* the current point we have fetched up to */
+ private final Map<TopicPartition, Long> fetched;
+
+ /* the last committed offset for each partition */
+ private final Map<TopicPartition, Long> committed;
+
+ /* do we need to request a partition assignment from the co-ordinator? */
+ private boolean needsPartitionAssignment;
+
+ public SubscriptionState() {
+ this.subscribedTopics = new HashSet<String>();
+ this.subscribedPartitions = new HashSet<TopicPartition>();
+ this.assignedPartitions = new HashSet<TopicPartition>();
+ this.consumed = new HashMap<TopicPartition, Long>();
+ this.fetched = new HashMap<TopicPartition, Long>();
+ this.committed = new HashMap<TopicPartition, Long>();
+ this.needsPartitionAssignment = false;
+ }
+
+ public void subscribe(String topic) {
+ if (this.subscribedPartitions.size() > 0)
+ throw new IllegalStateException("Subcription to topics and partitions are mutually exclusive");
+ if (!this.subscribedTopics.contains(topic)) {
+ this.subscribedTopics.add(topic);
+ this.needsPartitionAssignment = true;
+ }
+ }
+
+ public void unsubscribe(String topic) {
+ if (!this.subscribedTopics.contains(topic))
+ throw new IllegalStateException("Topic " + topic + " was never subscribed to.");
+ this.subscribedTopics.remove(topic);
+ this.needsPartitionAssignment = true;
+ for(TopicPartition tp: assignedPartitions())
+ if(topic.equals(tp.topic()))
+ clearPartition(tp);
+ }
+
+ public void subscribe(TopicPartition tp) {
+ if (this.subscribedTopics.size() > 0)
+ throw new IllegalStateException("Subcription to topics and partitions are mutually exclusive");
+ this.subscribedPartitions.add(tp);
+ this.assignedPartitions.add(tp);
+ }
+
+ public void unsubscribe(TopicPartition partition) {
+ if (!subscribedPartitions.contains(partition))
+ throw new IllegalStateException("Partition " + partition + " was never subscribed to.");
+ subscribedPartitions.remove(partition);
+ clearPartition(partition);
+ }
+
+ private void clearPartition(TopicPartition tp) {
+ this.assignedPartitions.remove(tp);
+ this.committed.remove(tp);
+ this.fetched.remove(tp);
+ this.consumed.remove(tp);
+ }
+
+ public void clearAssignment() {
+ this.assignedPartitions.clear();
+ this.committed.clear();
+ this.fetched.clear();
+ this.needsPartitionAssignment = !subscribedTopics().isEmpty();
+ }
+
+ public Set<String> subscribedTopics() {
+ return this.subscribedTopics;
+ }
+
+ public Long fetched(TopicPartition tp) {
+ return this.fetched.get(tp);
+ }
+
+ public void fetched(TopicPartition tp, long offset) {
+ if (!this.assignedPartitions.contains(tp))
+ throw new IllegalArgumentException("Can't change the fetch position for a partition you are not currently subscribed to.");
+ this.fetched.put(tp, offset);
+ }
+
+ public void committed(TopicPartition tp, long offset) {
+ this.committed.put(tp, offset);
+ }
+
+ public Long committed(TopicPartition tp) {
+ return this.committed.get(tp);
+ }
+
+ public void seek(TopicPartition tp, long offset) {
+ fetched(tp, offset);
+ consumed(tp, offset);
+ }
+
+ public Set<TopicPartition> assignedPartitions() {
+ return this.assignedPartitions;
+ }
+
+ public boolean partitionsAutoAssigned() {
+ return !this.subscribedTopics.isEmpty();
+ }
+
+ public void consumed(TopicPartition tp, long offset) {
+ if (!this.assignedPartitions.contains(tp))
+ throw new IllegalArgumentException("Can't change the consumed position for a partition you are not currently subscribed to.");
+ this.consumed.put(tp, offset);
+ }
+
+ public Long consumed(TopicPartition partition) {
+ return this.consumed.get(partition);
+ }
+
+ public Map<TopicPartition, Long> allConsumed() {
+ return this.consumed;
+ }
+
+ public boolean hasAllFetchPositions() {
+ return this.fetched.size() >= this.assignedPartitions.size();
+ }
+
+ public Set<TopicPartition> missingFetchPositions() {
+ Set<TopicPartition> copy = new HashSet<TopicPartition>(this.assignedPartitions);
+ copy.removeAll(this.fetched.keySet());
+ return copy;
+ }
+
+ public boolean needsPartitionAssignment() {
+ return this.needsPartitionAssignment;
+ }
+
+ public void changePartitionAssignment(List<TopicPartition> assignments) {
+ for (TopicPartition tp : assignments)
+ if (!this.subscribedTopics.contains(tp.topic()))
+ throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");
+ this.clearAssignment();
+ this.assignedPartitions.addAll(assignments);
+ this.needsPartitionAssignment = false;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index fc71710..ebc4c53 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -329,8 +329,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer");
}
- ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), serializedKey, serializedValue);
- int partition = partitioner.partition(serializedRecord, metadata.fetch());
+ int partition = partitioner.partition(record.topic(), serializedKey, record.partition(), metadata.fetch());
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
ensureValidRecordSize(serializedSize);
TopicPartition tp = new TopicPartition(record.topic(), partition);
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 904976f..84530f2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -100,7 +100,7 @@ public class MockProducer implements Producer<byte[], byte[]> {
public synchronized Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> record, Callback callback) {
int partition = 0;
if (this.cluster.partitionsForTopic(record.topic()) != null)
- partition = partitioner.partition(record, this.cluster);
+ partition = partitioner.partition(record.topic(), record.key(), record.partition(), this.cluster);
ProduceRequestResult result = new ProduceRequestResult();
FutureRecordMetadata future = new FutureRecordMetadata(result, 0);
TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 8b3e565..9a43d66 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -16,9 +16,9 @@ import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
-import java.util.Arrays;
import java.util.Map;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -38,23 +38,17 @@ public class ProducerConfig extends AbstractConfig {
private static final ConfigDef config;
/** <code>bootstrap.servers</code> */
- public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
- private static final String BOOSTRAP_SERVERS_DOC = "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Data will be load " + "balanced over all servers irrespective of which servers are specified here for bootstrapping—this list only "
- + "impacts the initial hosts used to discover the full set of servers. This list should be in the form "
- + "<code>host1:port1,host2:port2,...</code>. Since these servers are just used for the initial connection to "
- + "discover the full cluster membership (which may change dynamically), this list need not contain the full set of "
- + "servers (you may want more than one, though, in case a server is down). If no server in this list is available sending "
- + "data will fail until on becomes available.";
+ public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
/** <code>metadata.fetch.timeout.ms</code> */
public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
- private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the " + "topic's partitions. This configuration controls the maximum amount of time we will block waiting for the metadata "
+ private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the topic's partitions. This "
+ "fetch to succeed before throwing an exception back to the client.";
/** <code>metadata.max.age.ms</code> */
- public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
- private static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any " + " partition leadership changes to proactively discover any new brokers or partitions.";
-
+ public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
+ private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC;
+
/** <code>batch.size</code> */
public static final String BATCH_SIZE_CONFIG = "batch.size";
private static final String BATCH_SIZE_DOC = "The producer will attempt to batch records together into fewer requests whenever multiple records are being sent" + " to the same partition. This helps performance on both the client and the server. This configuration controls the "
@@ -113,17 +107,13 @@ public class ProducerConfig extends AbstractConfig {
+ "for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absense of load.";
/** <code>client.id</code> */
- public static final String CLIENT_ID_CONFIG = "client.id";
- private static final String CLIENT_ID_DOC = "The id string to pass to the server when making requests. The purpose of this is to be able to track the source " + "of requests beyond just ip/port by allowing a logical application name to be included with the request. The "
- + "application can set any string it wants as this has no functional purpose other than in logging and metrics.";
+ public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
/** <code>send.buffer.bytes</code> */
- public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
- private static final String SEND_BUFFER_DOC = "The size of the TCP send buffer to use when sending data";
+ public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG;
/** <code>receive.buffer.bytes</code> */
- public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
- private static final String RECEIVE_BUFFER_DOC = "The size of the TCP receive buffer to use when reading data";
+ public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG;
/** <code>max.request.size</code> */
public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
@@ -131,8 +121,7 @@ public class ProducerConfig extends AbstractConfig {
+ "batches the producer will send in a single request to avoid sending huge requests.";
/** <code>reconnect.backoff.ms</code> */
- public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
- private static final String RECONNECT_BACKOFF_MS_DOC = "The amount of time to wait before attempting to reconnect to a given host when a connection fails." + " This avoids a scenario where the client repeatedly attempts to connect to a host in a tight loop.";
+ public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
/** <code>block.on.buffer.full</code> */
public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full";
@@ -147,8 +136,7 @@ public class ProducerConfig extends AbstractConfig {
+ "may appear first.";
/** <code>retry.backoff.ms</code> */
- public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
- private static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed produce request to a given topic partition." + " This avoids repeated sending-and-failing in a tight loop.";
+ public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
/** <code>compression.type</code> */
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
@@ -156,17 +144,13 @@ public class ProducerConfig extends AbstractConfig {
+ "Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).";
/** <code>metrics.sample.window.ms</code> */
- public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
- private static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The metrics system maintains a configurable number of samples over a fixed window size. This configuration " + "controls the size of the window. For example we might maintain two samples each measured over a 30 second period. "
- + "When a window expires we erase and overwrite the oldest window.";
+ public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
/** <code>metrics.num.samples</code> */
- public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
- private static final String METRICS_NUM_SAMPLES_DOC = "The number of samples maintained to compute metrics.";
+ public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
/** <code>metric.reporters</code> */
- public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
- private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
+ public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
/** <code>max.in.flight.requests.per.connection</code> */
public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
@@ -183,22 +167,22 @@ public class ProducerConfig extends AbstractConfig {
private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface.";
static {
- config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, BOOSTRAP_SERVERS_DOC)
+ config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
.define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
.define(ACKS_CONFIG,
Type.STRING,
"1",
- in(Arrays.asList("all", "-1", "0", "1")),
+ in("all","-1", "0", "1"),
Importance.HIGH,
ACKS_DOC)
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
.define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
.define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC)
.define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC)
- .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CLIENT_ID_DOC)
- .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.MEDIUM, SEND_BUFFER_DOC)
- .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), Importance.MEDIUM, RECEIVE_BUFFER_DOC)
+ .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
+ .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
+ .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC)
.define(MAX_REQUEST_SIZE_CONFIG,
Type.INT,
1 * 1024 * 1024,
@@ -206,9 +190,9 @@ public class ProducerConfig extends AbstractConfig {
Importance.MEDIUM,
MAX_REQUEST_SIZE_DOC)
.define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC)
- .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), Importance.LOW, RECONNECT_BACKOFF_MS_DOC)
- .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC)
- .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, RETRY_BACKOFF_MS_DOC)
+ .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 50L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
+ .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+ .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
.define(METADATA_FETCH_TIMEOUT_CONFIG,
Type.LONG,
60 * 1000,
@@ -221,8 +205,8 @@ public class ProducerConfig extends AbstractConfig {
30000,
atLeast(0),
Importance.LOW,
- METRICS_SAMPLE_WINDOW_MS_DOC)
- .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC)
+ CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
+ .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
.define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
Type.INT,
5,
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index dcf4658..3aff624 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -78,9 +78,9 @@ public final class Metadata {
}
/**
- * The next time to update the cluster info is the maximum of the time the current info will expire
- * and the time the current info can be updated (i.e. backoff time has elapsed); If an update has
- * been request then the expiry time is now
+ * The next time to update the cluster info is the maximum of the time the current info will expire and the time the
+ * current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time
+ * is now
*/
public synchronized long timeToNextUpdate(long nowMs) {
long timeToExpire = needUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0);
@@ -120,6 +120,15 @@ public final class Metadata {
}
/**
+ * Add one or more topics to maintain metadata for
+ */
+ public synchronized void addTopics(String... topics) {
+ for (String topic : topics)
+ this.topics.add(topic);
+ requestUpdate();
+ }
+
+ /**
* Get the list of topics we are currently maintaining metadata for
*/
public synchronized Set<String> topics() {
@@ -137,6 +146,13 @@ public final class Metadata {
notifyAll();
log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}
+
+ /**
+ * @return The current metadata version
+ */
+ public synchronized int version() {
+ return this.version;
+ }
/**
* The last time metadata was updated.
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
index 483899d..8112e6d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
@@ -20,7 +20,6 @@ import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
@@ -40,32 +39,34 @@ public class Partitioner {
/**
* Compute the partition for the given record.
*
- * @param record The record being sent
+ * @param topic The topic name
+ * @param key The key to partition on (or null if no key)
+ * @param partition The partition to use (or null if none)
* @param cluster The current cluster metadata
*/
- public int partition(ProducerRecord<byte[], byte[]> record, Cluster cluster) {
- List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
+ public int partition(String topic, byte[] key, Integer partition, Cluster cluster) {
+ List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
- if (record.partition() != null) {
+ if (partition != null) {
// they have given us a partition, use it
- if (record.partition() < 0 || record.partition() >= numPartitions)
- throw new IllegalArgumentException("Invalid partition given with record: " + record.partition()
+ if (partition < 0 || partition >= numPartitions)
+ throw new IllegalArgumentException("Invalid partition given with record: " + partition
+ " is not in the range [0..."
+ numPartitions
+ "].");
- return record.partition();
- } else if (record.key() == null) {
+ return partition;
+ } else if (key == null) {
// choose the next available node in a round-robin fashion
for (int i = 0; i < numPartitions; i++) {
- int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
- if (partitions.get(partition).leader() != null)
- return partition;
+ int part = Utils.abs(counter.getAndIncrement()) % numPartitions;
+ if (partitions.get(part).leader() != null)
+ return part;
}
// no partitions are available, give a non-available partition
return Utils.abs(counter.getAndIncrement()) % numPartitions;
} else {
// hash the key to choose a partition
- return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
+ return Utils.abs(Utils.murmur2(key)) % numPartitions;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index ccc03d8..8726809 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
@@ -146,7 +147,8 @@ public class Sender implements Runnable {
/**
* Run a single iteration of sending
*
- * @param now The current POSIX time in milliseconds
+ * @param now
+ * The current POSIX time in milliseconds
*/
public void run(long now) {
Cluster cluster = metadata.fetch();
@@ -169,9 +171,12 @@ public class Sender implements Runnable {
}
// create produce requests
- Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
+ Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
+ result.readyNodes,
+ this.maxRequestSize,
+ now);
+ sensors.updateProduceRequestMetrics(batches);
List<ClientRequest> requests = createProduceRequests(batches, now);
- sensors.updateProduceRequestMetrics(requests);
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
@@ -183,18 +188,14 @@ public class Sender implements Runnable {
log.trace("Created {} produce requests: {}", requests.size(), requests);
pollTimeout = 0;
}
+ for (ClientRequest request : requests)
+ client.send(request);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
- List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);
- for (ClientResponse response : responses) {
- if (response.wasDisconnected())
- handleDisconnect(response, now);
- else
- handleResponse(response, now);
- }
+ this.client.poll(pollTimeout, now);
}
/**
@@ -206,45 +207,44 @@ public class Sender implements Runnable {
this.wakeup();
}
- private void handleDisconnect(ClientResponse response, long now) {
- log.trace("Cancelled request {} due to node {} being disconnected", response, response.request().request().destination());
- int correlation = response.request().request().header().correlationId();
- @SuppressWarnings("unchecked")
- Map<TopicPartition, RecordBatch> responseBatches = (Map<TopicPartition, RecordBatch>) response.request().attachment();
- for (RecordBatch batch : responseBatches.values())
- completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlation, now);
- }
-
/**
* Handle a produce response
*/
- private void handleResponse(ClientResponse response, long now) {
+ private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) {
int correlationId = response.request().request().header().correlationId();
- log.trace("Received produce response from node {} with correlation id {}",
- response.request().request().destination(),
- correlationId);
- @SuppressWarnings("unchecked")
- Map<TopicPartition, RecordBatch> batches = (Map<TopicPartition, RecordBatch>) response.request().attachment();
- // if we have a response, parse it
- if (response.hasResponse()) {
- ProduceResponse produceResponse = new ProduceResponse(response.responseBody());
- for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
- TopicPartition tp = entry.getKey();
- ProduceResponse.PartitionResponse partResp = entry.getValue();
- Errors error = Errors.forCode(partResp.errorCode);
- RecordBatch batch = batches.get(tp);
- completeBatch(batch, error, partResp.baseOffset, correlationId, now);
- }
- this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs());
- } else {
- // this is the acks = 0 case, just complete all requests
+ if (response.wasDisconnected()) {
+ log.trace("Cancelled request {} due to node {} being disconnected", response, response.request()
+ .request()
+ .destination());
for (RecordBatch batch : batches.values())
- completeBatch(batch, Errors.NONE, -1L, correlationId, now);
+ completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlationId, now);
+ } else {
+ log.trace("Received produce response from node {} with correlation id {}",
+ response.request().request().destination(),
+ correlationId);
+ // if we have a response, parse it
+ if (response.hasResponse()) {
+ ProduceResponse produceResponse = new ProduceResponse(response.responseBody());
+ for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses()
+ .entrySet()) {
+ TopicPartition tp = entry.getKey();
+ ProduceResponse.PartitionResponse partResp = entry.getValue();
+ Errors error = Errors.forCode(partResp.errorCode);
+ RecordBatch batch = batches.get(tp);
+ completeBatch(batch, error, partResp.baseOffset, correlationId, now);
+ }
+ this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs());
+ } else {
+ // this is the acks = 0 case, just complete all requests
+ for (RecordBatch batch : batches.values())
+ completeBatch(batch, Errors.NONE, -1L, correlationId, now);
+ }
}
}
/**
* Complete or retry the given batch of records.
+ *
* @param batch The record batch
* @param error The error (or null if none)
* @param baseOffset The base offset assigned to the records if successful
@@ -294,7 +294,7 @@ public class Sender implements Runnable {
*/
private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size());
- Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
+ final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
for (RecordBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
ByteBuffer recordsBuffer = batch.records.buffer();
@@ -303,8 +303,15 @@ public class Sender implements Runnable {
recordsByPartition.put(tp, batch);
}
ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);
- RequestSend send = new RequestSend(destination, this.client.nextRequestHeader(ApiKeys.PRODUCE), request.toStruct());
- return new ClientRequest(now, acks != 0, send, recordsByPartition);
+ RequestSend send = new RequestSend(destination,
+ this.client.nextRequestHeader(ApiKeys.PRODUCE),
+ request.toStruct());
+ RequestCompletionHandler callback = new RequestCompletionHandler() {
+ public void onComplete(ClientResponse response) {
+ handleProduceResponse(response, recordsByPartition, time.milliseconds());
+ }
+ };
+ return new ClientRequest(now, acks != 0, send, callback);
}
/**
@@ -428,44 +435,38 @@ public class Sender implements Runnable {
}
}
- public void updateProduceRequestMetrics(List<ClientRequest> requests) {
+ public void updateProduceRequestMetrics(Map<Integer, List<RecordBatch>> batches) {
long now = time.milliseconds();
- for (int i = 0; i < requests.size(); i++) {
- ClientRequest request = requests.get(i);
+ for (List<RecordBatch> nodeBatch : batches.values()) {
int records = 0;
-
- if (request.attachment() != null) {
- Map<TopicPartition, RecordBatch> responseBatches = (Map<TopicPartition, RecordBatch>) request.attachment();
- for (RecordBatch batch : responseBatches.values()) {
-
- // register all per-topic metrics at once
- String topic = batch.topicPartition.topic();
- maybeRegisterTopicMetrics(topic);
-
- // per-topic record send rate
- String topicRecordsCountName = "topic." + topic + ".records-per-batch";
- Sensor topicRecordCount = Utils.notNull(this.metrics.getSensor(topicRecordsCountName));
- topicRecordCount.record(batch.recordCount);
-
- // per-topic bytes send rate
- String topicByteRateName = "topic." + topic + ".bytes";
- Sensor topicByteRate = Utils.notNull(this.metrics.getSensor(topicByteRateName));
- topicByteRate.record(batch.records.sizeInBytes());
-
- // per-topic compression rate
- String topicCompressionRateName = "topic." + topic + ".compression-rate";
- Sensor topicCompressionRate = Utils.notNull(this.metrics.getSensor(topicCompressionRateName));
- topicCompressionRate.record(batch.records.compressionRate());
-
- // global metrics
- this.batchSizeSensor.record(batch.records.sizeInBytes(), now);
- this.queueTimeSensor.record(batch.drainedMs - batch.createdMs, now);
- this.compressionRateSensor.record(batch.records.compressionRate());
- this.maxRecordSizeSensor.record(batch.maxRecordSize, now);
- records += batch.recordCount;
- }
- this.recordsPerRequestSensor.record(records, now);
+ for (RecordBatch batch : nodeBatch) {
+ // register all per-topic metrics at once
+ String topic = batch.topicPartition.topic();
+ maybeRegisterTopicMetrics(topic);
+
+ // per-topic record send rate
+ String topicRecordsCountName = "topic." + topic + ".records-per-batch";
+ Sensor topicRecordCount = Utils.notNull(this.metrics.getSensor(topicRecordsCountName));
+ topicRecordCount.record(batch.recordCount);
+
+ // per-topic bytes send rate
+ String topicByteRateName = "topic." + topic + ".bytes";
+ Sensor topicByteRate = Utils.notNull(this.metrics.getSensor(topicByteRateName));
+ topicByteRate.record(batch.records.sizeInBytes());
+
+ // per-topic compression rate
+ String topicCompressionRateName = "topic." + topic + ".compression-rate";
+ Sensor topicCompressionRate = Utils.notNull(this.metrics.getSensor(topicCompressionRateName));
+ topicCompressionRate.record(batch.records.compressionRate());
+
+ // global metrics
+ this.batchSizeSensor.record(batch.records.sizeInBytes(), now);
+ this.queueTimeSensor.record(batch.drainedMs - batch.createdMs, now);
+ this.compressionRateSensor.record(batch.records.compressionRate());
+ this.maxRecordSizeSensor.record(batch.maxRecordSize, now);
+ records += batch.recordCount;
}
+ this.recordsPerRequestSensor.record(records, now);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index d3299b9..d7ccbcd 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -26,6 +26,7 @@ public final class Cluster {
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
private final Map<String, List<PartitionInfo>> partitionsByTopic;
private final Map<Integer, List<PartitionInfo>> partitionsByNode;
+ private final Map<Integer, Node> nodesById;
/**
* Create a new cluster with the given nodes and partitions
@@ -37,6 +38,10 @@ public final class Cluster {
List<Node> copy = new ArrayList<Node>(nodes);
Collections.shuffle(copy);
this.nodes = Collections.unmodifiableList(copy);
+
+ this.nodesById = new HashMap<Integer, Node>();
+ for(Node node: nodes)
+ this.nodesById.put(node.id(), node);
// index the partitions by topic/partition for quick lookup
this.partitionsByTopicPartition = new HashMap<TopicPartition, PartitionInfo>(partitions.size());
@@ -97,6 +102,15 @@ public final class Cluster {
public List<Node> nodes() {
return this.nodes;
}
+
+ /**
+ * Get the node by the node id (or null if no such node exists)
+ * @param id The id of the node
+ * @return The node, or null if no such node exists
+ */
+ public Node nodeById(int id) {
+ return this.nodesById.get(id);
+ }
/**
* Get the current leader for the given topic-partition
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
index b15aa2c..28562f9 100644
--- a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
+++ b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
@@ -69,10 +69,10 @@ public class PartitionInfo {
@Override
public String toString() {
- return String.format("Partition(topic = %s, partition = %d, leader = %d, replicas = %s, isr = %s",
+ return String.format("Partition(topic = %s, partition = %d, leader = %s, replicas = %s, isr = %s",
topic,
partition,
- leader.id(),
+ leader == null? "none" : leader.id(),
fmtNodeIds(replicas),
fmtNodeIds(inSyncReplicas));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 98cb79b..38ce10b 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -21,6 +21,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.kafka.common.utils.Utils;
+
/**
* This class is used for specifying the set of expected configurations, their type, their defaults, their
* documentation, and any special validation logic used for checking the correctness of the values the user provides.
@@ -292,39 +294,23 @@ public class ConfigDef {
this.validStrings = validStrings;
}
- public static ValidString in(List<String> validStrings) {
- return new ValidString(validStrings);
+ public static ValidString in(String... validStrings) {
+ return new ValidString(Arrays.asList(validStrings));
}
@Override
public void ensureValid(String name, Object o) {
-
String s = (String) o;
-
if (!validStrings.contains(s)) {
- throw new ConfigException(name,o,"String must be one of:" +join(validStrings));
+ throw new ConfigException(name,o,"String must be one of: " + Utils.join(validStrings, ", "));
}
}
public String toString() {
- return "[" + join(validStrings) + "]";
+ return "[" + Utils.join(validStrings, ", ") + "]";
}
- private String join(List<String> list)
- {
- StringBuilder sb = new StringBuilder();
- boolean first = true;
- for (String item : list)
- {
- if (first)
- first = false;
- else
- sb.append(",");
- sb.append(item);
- }
- return sb.toString();
- }
}
private static class ConfigKey {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java b/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java
index 7c948b1..a566b90 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.KafkaException;
* Any API exception that is part of the public protocol and should be a subclass of this class and be part of this
* package.
*/
-public abstract class ApiException extends KafkaException {
+public class ApiException extends KafkaException {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
index b68bbf0..b5f8d83 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
@@ -1,18 +1,14 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.network;
@@ -51,13 +47,17 @@ public interface Selectable {
public void close();
/**
- * Initiate any sends provided, and make progress on any other I/O operations in-flight (connections,
- * disconnections, existing sends, and receives)
+ * Queue the given request for sending in the subsequent {@poll(long)} calls
+ * @param send The request to send
+ */
+ public void send(NetworkSend send);
+
+ /**
+ * Do I/O. Reads, writes, connection establishment, etc.
* @param timeout The amount of time to block if there is nothing to do
- * @param sends The new sends to initiate
* @throws IOException
*/
- public void poll(long timeout, List<NetworkSend> sends) throws IOException;
+ public void poll(long timeout) throws IOException;
/**
* The list of sends that completed on the last {@link #poll(long, List) poll()} call.
@@ -81,4 +81,26 @@ public interface Selectable {
*/
public List<Integer> connected();
+ /**
+ * Disable reads from the given connection
+ * @param id The id for the connection
+ */
+ public void mute(int id);
+
+ /**
+ * Re-enable reads from the given connection
+ * @param id The id for the connection
+ */
+ public void unmute(int id);
+
+ /**
+ * Disable reads from all connections
+ */
+ public void muteAll();
+
+ /**
+ * Re-enable reads from all connections
+ */
+ public void unmuteAll();
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 74d695b..e18a769 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -12,6 +12,7 @@
*/
package org.apache.kafka.common.network;
+import java.io.EOFException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -81,6 +82,7 @@ public class Selector implements Selectable {
private final List<NetworkReceive> completedReceives;
private final List<Integer> disconnected;
private final List<Integer> connected;
+ private final List<Integer> failedSends;
private final Time time;
private final SelectorMetrics sensors;
private final String metricGrpPrefix;
@@ -103,6 +105,7 @@ public class Selector implements Selectable {
this.completedReceives = new ArrayList<NetworkReceive>();
this.connected = new ArrayList<Integer>();
this.disconnected = new ArrayList<Integer>();
+ this.failedSends = new ArrayList<Integer>();
this.sensors = new SelectorMetrics(metrics);
}
@@ -179,10 +182,26 @@ public class Selector implements Selectable {
}
/**
+ * Queue the given request for sending in the subsequent {@poll(long)} calls
+ * @param send The request to send
+ */
+ public void send(NetworkSend send) {
+ SelectionKey key = keyForId(send.destination());
+ Transmissions transmissions = transmissions(key);
+ if (transmissions.hasSend())
+ throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
+ transmissions.send = send;
+ try {
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ } catch (CancelledKeyException e) {
+ close(key);
+ this.failedSends.add(send.destination());
+ }
+ }
+
+ /**
* Do whatever I/O can be done on each connection without blocking. This includes completing connections, completing
* disconnections, initiating new sends, or making progress on in-progress sends or receives.
- * <p>
- * The provided network sends will be started.
*
* When this call is completed the user can check for completed sends, receives, connections or disconnects using
* {@link #completedSends()}, {@link #completedReceives()}, {@link #connected()}, {@link #disconnected()}. These
@@ -190,29 +209,13 @@ public class Selector implements Selectable {
* completed I/O.
*
* @param timeout The amount of time to wait, in milliseconds. If negative, wait indefinitely.
- * @param sends The list of new sends to begin
- *
* @throws IllegalStateException If a send is given for which we have no existing connection or for which there is
* already an in-progress send
*/
@Override
- public void poll(long timeout, List<NetworkSend> sends) throws IOException {
+ public void poll(long timeout) throws IOException {
clear();
- /* register for write interest on any new sends */
- for (NetworkSend send : sends) {
- SelectionKey key = keyForId(send.destination());
- Transmissions transmissions = transmissions(key);
- if (transmissions.hasSend())
- throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
- transmissions.send = send;
- try {
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- } catch (CancelledKeyException e) {
- close(key);
- }
- }
-
/* check ready keys */
long startSelect = time.nanoseconds();
int readyKeys = select(timeout);
@@ -266,21 +269,34 @@ public class Selector implements Selectable {
}
/* cancel any defunct sockets */
- if (!key.isValid())
+ if (!key.isValid()) {
close(key);
+ this.disconnected.add(transmissions.id);
+ }
} catch (IOException e) {
- InetAddress remoteAddress = null;
- Socket socket = channel.socket();
- if (socket != null)
- remoteAddress = socket.getInetAddress();
- log.warn("Error in I/O with {}", remoteAddress , e);
+ String desc = socketDescription(channel);
+ if(e instanceof EOFException)
+ log.info("Connection {} disconnected", desc);
+ else
+ log.warn("Error in I/O with connection to {}", desc, e);
close(key);
+ this.disconnected.add(transmissions.id);
}
}
}
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
}
+
+ private String socketDescription(SocketChannel channel) {
+ Socket socket = channel.socket();
+ if(socket == null)
+ return "[unconnected socket]";
+ else if(socket.getInetAddress() != null)
+ return socket.getInetAddress().toString();
+ else
+ return socket.getLocalAddress().toString();
+ }
@Override
public List<NetworkSend> completedSends() {
@@ -302,6 +318,36 @@ public class Selector implements Selectable {
return this.connected;
}
+ @Override
+ public void mute(int id) {
+ mute(this.keyForId(id));
+ }
+
+ private void mute(SelectionKey key) {
+ key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
+ }
+
+ @Override
+ public void unmute(int id) {
+ unmute(this.keyForId(id));
+ }
+
+ private void unmute(SelectionKey key) {
+ key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+ }
+
+ @Override
+ public void muteAll() {
+ for (SelectionKey key : this.keys.values())
+ mute(key);
+ }
+
+ @Override
+ public void unmuteAll() {
+ for (SelectionKey key : this.keys.values())
+ unmute(key);
+ }
+
/**
* Clear the results from the prior poll
*/
@@ -310,6 +356,8 @@ public class Selector implements Selectable {
this.completedReceives.clear();
this.connected.clear();
this.disconnected.clear();
+ this.disconnected.addAll(this.failedSends);
+ this.failedSends.clear();
}
/**
@@ -335,7 +383,6 @@ public class Selector implements Selectable {
SocketChannel channel = channel(key);
Transmissions trans = transmissions(key);
if (trans != null) {
- this.disconnected.add(trans.id);
this.keys.remove(trans.id);
trans.clearReceive();
trans.clearSend();
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 3316b6a..a8deac4 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -19,36 +19,62 @@ package org.apache.kafka.common.protocol;
import java.util.HashMap;
import java.util.Map;
-import org.apache.kafka.common.errors.*;
-
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.NetworkException;
+import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.NotLeaderForPartitionException;
+import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
/**
* This class contains all the client-server errors--those errors that must be sent from the server to the client. These
* are thus part of the protocol. The names can be changed but the error code cannot.
- *
+ *
* Do not add exceptions that occur only on the client or only on the server here.
*/
public enum Errors {
UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")),
NONE(0, null),
- OFFSET_OUT_OF_RANGE(1, new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
- CORRUPT_MESSAGE(2, new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
- UNKNOWN_TOPIC_OR_PARTITION(3, new UnknownTopicOrPartitionException("This server does not host this topic-partition.")),
+ OFFSET_OUT_OF_RANGE(1,
+ new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
+ CORRUPT_MESSAGE(2,
+ new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
+ UNKNOWN_TOPIC_OR_PARTITION(3,
+ new UnknownTopicOrPartitionException("This server does not host this topic-partition.")),
// TODO: errorCode 4 for InvalidFetchSize
- LEADER_NOT_AVAILABLE(5, new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
- NOT_LEADER_FOR_PARTITION(6, new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")),
+ LEADER_NOT_AVAILABLE(5,
+ new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
+ NOT_LEADER_FOR_PARTITION(6,
+ new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")),
REQUEST_TIMED_OUT(7, new TimeoutException("The request timed out.")),
- // TODO: errorCode 8, 9, 11
- MESSAGE_TOO_LARGE(10, new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
+ MESSAGE_TOO_LARGE(10,
+ new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")),
- // TODO: errorCode 14, 15, 16
- INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")),
- RECORD_LIST_TOO_LARGE(18, new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server.")),
- NOT_ENOUGH_REPLICAS(19, new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")),
- NOT_ENOUGH_REPLICAS_AFTER_APPEND(20, new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required."));
+ OFFSET_LOAD_IN_PROGRESS(14, new ApiException("The coordinator is loading offsets and can't process requests.")),
+ CONSUMER_COORDINATOR_NOT_AVAILABLE(15, new ApiException("The coordinator is not available.")),
+ NOT_COORDINATOR_FOR_CONSUMER(16, new ApiException("This is not the correct co-ordinator for this consumer.")),
+ INVALID_TOPIC_EXCEPTION(17,
+ new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")),
+ RECORD_LIST_TOO_LARGE(18,
+ new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server.")),
+ NOT_ENOUGH_REPLICAS(19,
+ new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")),
+ NOT_ENOUGH_REPLICAS_AFTER_APPEND(20,
+ new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required."));
+
private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();
+
static {
for (Errors error : Errors.values()) {
codeToError.put(error.code(), error);
@@ -84,8 +110,9 @@ public enum Errors {
* Throw the exception corresponding to this error if there is one
*/
public void maybeThrow() {
- if (exception != null)
+ if (exception != null) {
throw this.exception;
+ }
}
/**