You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/03/09 10:10:56 UTC
[3/4] camel git commit: CAMEL-9467: Migrate camel-kafka to use java
client instead of scala. Thanks to Anbumani Balusamy for the patch.
CAMEL-9467: Migrate camel-kafka to use java client instead of scala. Thanks to Anbumani Balusamy for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b2aa831d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b2aa831d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b2aa831d
Branch: refs/heads/master
Commit: b2aa831da8c8f78f7d6ca908c5b33957bbc7fa24
Parents: 038e161
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Mar 9 09:58:36 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Mar 9 10:01:59 2016 +0100
----------------------------------------------------------------------
components/camel-kafka/pom.xml | 26 +-
.../camel/component/kafka/KafkaComponent.java | 22 +-
.../component/kafka/KafkaConfiguration.java | 1204 ++++++++++++------
.../camel/component/kafka/KafkaConstants.java | 13 +-
.../camel/component/kafka/KafkaConsumer.java | 183 +--
.../camel/component/kafka/KafkaEndpoint.java | 598 +++++----
.../camel/component/kafka/KafkaProducer.java | 64 +-
.../component/kafka/BaseEmbeddedKafkaTest.java | 21 +-
.../component/kafka/KafkaComponentTest.java | 224 ++--
.../kafka/KafkaConsumerBatchSizeTest.java | 64 +-
.../component/kafka/KafkaConsumerFullTest.java | 34 +-
.../component/kafka/KafkaConsumerTest.java | 8 +-
.../component/kafka/KafkaEndpointTest.java | 13 +-
.../component/kafka/KafkaProducerFullTest.java | 145 +--
.../component/kafka/KafkaProducerTest.java | 65 +-
.../component/kafka/SimplePartitioner.java | 39 -
.../kafka/embedded/EmbeddedKafkaCluster.java | 12 +-
.../src/test/resources/log4j.properties | 2 +-
18 files changed, 1618 insertions(+), 1119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-kafka/pom.xml b/components/camel-kafka/pom.xml
index 1a5f98f..f8bbdb5 100644
--- a/components/camel-kafka/pom.xml
+++ b/components/camel-kafka/pom.xml
@@ -15,7 +15,8 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -37,31 +38,35 @@
</properties>
<dependencies>
+
+ <!-- camel -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
</dependency>
+
+ <!-- kafka java client -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka-version}</version>
+ </dependency>
+
+ <!-- kafka server for testing using scala -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka-version}</version>
+ <scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
- <exclusion>
- <artifactId>scala-library</artifactId>
- <groupId>org.scala-lang</groupId>
- </exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <scope>provided</scope>
- </dependency>
+ <!-- test -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test</artifactId>
@@ -77,6 +82,7 @@
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
+
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index c9d4c2a..2981b3f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -20,8 +20,6 @@ import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.impl.UriEndpointComponent;
-import org.apache.camel.util.CamelContextHelper;
-import org.apache.camel.util.EndpointHelper;
public class KafkaComponent extends UriEndpointComponent {
@@ -34,30 +32,14 @@ public class KafkaComponent extends UriEndpointComponent {
}
@Override
- protected KafkaEndpoint createEndpoint(String uri,
- String remaining,
- Map<String, Object> params) throws Exception {
-
+ protected KafkaEndpoint createEndpoint(String uri, String remaining, Map<String, Object> params) throws Exception {
KafkaEndpoint endpoint = new KafkaEndpoint(uri, this);
String brokers = remaining.split("\\?")[0];
- Object confparam = params.get("configuration");
- if (confparam != null) {
- // need a special handling to resolve the reference before other parameters are set/merged into the config
- KafkaConfiguration confobj = null;
- if (confparam instanceof KafkaConfiguration) {
- confobj = (KafkaConfiguration)confparam;
- } else if (confparam instanceof String && EndpointHelper.isReferenceParameter((String)confparam)) {
- confobj = (KafkaConfiguration)CamelContextHelper.lookup(getCamelContext(), ((String)confparam).substring(1));
- }
- if (confobj != null) {
- endpoint.setConfiguration(confobj.copy());
- }
- params.remove("configuration");
- }
if (brokers != null) {
endpoint.getConfiguration().setBrokers(brokers);
}
setProperties(endpoint, params);
return endpoint;
}
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 894df0c..4a948c1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -18,28 +18,28 @@ package org.apache.camel.component.kafka;
import java.util.Properties;
-import kafka.producer.DefaultPartitioner;
-import org.apache.camel.RuntimeCamelException;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriParams;
import org.apache.camel.spi.UriPath;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
@UriParams
-public class KafkaConfiguration implements Cloneable {
+public class KafkaConfiguration {
+
+ @UriPath @Metadata(required = "true")
+ private String brokers;
- @UriParam
- private String zookeeperConnect;
- @UriParam
- private String zookeeperHost;
- @UriParam(defaultValue = "2181")
- private int zookeeperPort = 2181;
@UriParam @Metadata(required = "true")
private String topic;
@UriParam
private String groupId;
- @UriParam(defaultValue = "DefaultPartitioner")
- private String partitioner = DefaultPartitioner.class.getCanonicalName();
+ @UriParam(defaultValue = KafkaConstants.KAFKA_DEFAULT_PARTITIONER)
+ private String partitioner = KafkaConstants.KAFKA_DEFAULT_PARTITIONER;
@UriParam(label = "consumer", defaultValue = "10")
private int consumerStreams = 10;
@UriParam(label = "consumer", defaultValue = "1")
@@ -53,134 +53,302 @@ public class KafkaConfiguration implements Cloneable {
@UriParam
private String clientId;
+ //key.deserializer
+ @UriParam(label = "consumer", defaultValue = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER)
+ private String keyDeserializer = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER;
+ //value.deserializer
+ @UriParam(label = "consumer", defaultValue = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER)
+ private String valueDeserializer = KafkaConstants.KAFKA_DEFAULT_DESERIALIZER;
+ //fetch.min.bytes
+ @UriParam(label = "consumer", defaultValue = "1024")
+ private Integer fetchMinBytes = 1024;
+ //heartbeat.interval.ms
+ @UriParam(label = "consumer", defaultValue = "3000")
+ private Integer heartbeatIntervalMs = 3000;
+ //max.partition.fetch.bytes
+ @UriParam(label = "consumer", defaultValue = "1048576")
+ private Integer maxPartitionFetchBytes = 1048576;
+ //session.timeout.ms
+ @UriParam(label = "consumer", defaultValue = "30000")
+ private Integer sessionTimeoutMs = 30000;
+ //auto.offset.reset
+ @UriParam(label = "consumer", defaultValue = "latest", enums = "latest,earliest,none")
+ private String autoOffsetReset = "latest";
+ //partition.assignment.strategy
+ @UriParam(label = "consumer", defaultValue = KafkaConstants.PARTITIONER_RANGE_ASSIGNOR)
+ private String partitionAssignor = KafkaConstants.PARTITIONER_RANGE_ASSIGNOR;
+ //request.timeout.ms
+ @UriParam(label = "consumer", defaultValue = "40000")
+ private Integer consumerRequestTimeoutMs = 40000;
+ //auto.commit.interval.ms
+ @UriParam(label = "consumer", defaultValue = "5000")
+ private Integer autoCommitIntervalMs = 5000;
+ //check.crcs
+ @UriParam(label = "consumer", defaultValue = "true")
+ private Boolean checkCrcs = true;
+ //fetch.max.wait.ms
+ @UriParam(label = "consumer", defaultValue = "500")
+ private Integer fetchWaitMaxMs = 500;
+
//Consumer configuration properties
@UriParam(label = "consumer")
private String consumerId;
- @UriParam(label = "consumer", defaultValue = "30000")
- private Integer socketTimeoutMs = 30 * 1000;
- @UriParam(label = "consumer", defaultValue = "" + 64 * 1024)
- private Integer socketReceiveBufferBytes = 64 * 1024;
- @UriParam(label = "consumer", defaultValue = "" + 1024 * 1024)
- private Integer fetchMessageMaxBytes = 1024 * 1024;
@UriParam(label = "consumer", defaultValue = "true")
private Boolean autoCommitEnable = true;
- @UriParam(label = "consumer", defaultValue = "60000")
- private Integer autoCommitIntervalMs = 60 * 1000;
- @UriParam(label = "consumer", defaultValue = "2")
- private Integer queuedMaxMessageChunks = 2;
- @UriParam(label = "consumer", defaultValue = "4")
- private Integer rebalanceMaxRetries = 4;
- @UriParam(label = "consumer", defaultValue = "1")
- private Integer fetchMinBytes = 1;
- @UriParam(label = "consumer", defaultValue = "100")
- private Integer fetchWaitMaxMs = 100;
- @UriParam(label = "consumer", defaultValue = "2000")
- private Integer rebalanceBackoffMs = 2000;
- @UriParam(label = "consumer", defaultValue = "200")
- private Integer refreshLeaderBackoffMs = 200;
- @UriParam(label = "consumer", defaultValue = "largest", enums = "smallest,largest,fail")
- private String autoOffsetReset = "largest";
- @UriParam(label = "consumer")
- private Integer consumerTimeoutMs;
- @UriParam(label = "consumer", defaultValue = "zookeeper", enums = "zookeeper,kafka")
- private String offsetsStorage = "zookeeper";
- @UriParam(label = "consumer", defaultValue = "true")
- private Boolean dualCommitEnabled = true;
-
- //Zookeepr configuration properties
- @UriParam
- private Integer zookeeperSessionTimeoutMs;
- @UriParam
- private Integer zookeeperConnectionTimeoutMs;
- @UriParam
- private Integer zookeeperSyncTimeMs;
//Producer configuration properties
- @UriPath
- private String brokers;
- @UriParam(label = "producer", defaultValue = "sync", enums = "async,sync")
- private String producerType = "sync";
- @UriParam(label = "producer", defaultValue = "none", enums = "none,gzip,snappy")
- private String compressionCodec = "none";
- @UriParam(label = "producer")
- private String compressedTopics;
- @UriParam(label = "producer", defaultValue = "3")
- private Integer messageSendMaxRetries = 3;
@UriParam(label = "producer", defaultValue = "100")
private Integer retryBackoffMs = 100;
- @UriParam(label = "producer", defaultValue = "600000")
- private Integer topicMetadataRefreshIntervalMs = 600 * 1000;
-
- //Sync producer config
- @UriParam(label = "producer", defaultValue = "" + 100 * 1024)
- private Integer sendBufferBytes = 100 * 1024;
- @UriParam(label = "producer", defaultValue = "0")
- private short requestRequiredAcks;
- @UriParam(label = "producer", defaultValue = "10000")
- private Integer requestTimeoutMs = 10000;
//Async producer config
- @UriParam(label = "producer", defaultValue = "5000")
- private Integer queueBufferingMaxMs = 5000;
@UriParam(label = "producer", defaultValue = "10000")
private Integer queueBufferingMaxMessages = 10000;
@UriParam(label = "producer")
- private Integer queueEnqueueTimeoutMs;
- @UriParam(label = "producer", defaultValue = "200")
- private Integer batchNumMessages = 200;
- @UriParam(label = "producer")
private String serializerClass;
@UriParam(label = "producer")
private String keySerializerClass;
+ @UriParam(label = "producer", defaultValue = "1")
+ private Integer requestRequiredAcks = 1;
+ //buffer.memory
+ @UriParam(label = "producer", defaultValue = "33554432")
+ private Integer bufferMemorySize = 33554432;
+ //compression.type
+ @UriParam(label = "producer", defaultValue = "none", enums = "none,gzip,snappy,lz4")
+ private String compressionCodec = "none";
+ //retries
+ @UriParam(label = "producer", defaultValue = "0")
+ private Integer retries = 0;
+ // SSL
+ // ssl.key.password
+ @UriParam(label = "producer")
+ private String sslKeyPassword;
+ // ssl.keystore.location
+ @UriParam(label = "producer")
+ private String sslKeystoreLocation;
+ // ssl.keystore.password
+ @UriParam(label = "producer")
+ private String sslKeystorePassword;
+ //ssl.truststore.location
+ @UriParam(label = "producer")
+ private String sslTruststoreLocation;
+ //ssl.truststore.password
+ @UriParam(label = "producer")
+ private String sslTruststorePassword;
+ //batch.size
+ @UriParam(label = "producer", defaultValue = "16384")
+ private Integer producerBatchSize = 16384;
+ //connections.max.idle.ms
+ @UriParam(label = "producer", defaultValue = "540000")
+ private Integer connectionMaxIdleMs = 540000;
+ //linger.ms
+ @UriParam(label = "producer", defaultValue = "0")
+ private Integer lingerMs = 0;
+ //linger.ms
+ @UriParam(label = "producer", defaultValue = "60000")
+ private Integer maxBlockMs = 60000;
+ //max.request.size
+ @UriParam(label = "producer", defaultValue = "1048576")
+ private Integer maxRequestSize = 1048576;
+ //receive.buffer.bytes
+ @UriParam(label = "producer", defaultValue = "32768")
+ private Integer receiveBufferBytes = 32768;
+ //request.timeout.ms
+ @UriParam(label = "producer", defaultValue = "30000")
+ private Integer requestTimeoutMs = 30000;
+ // SASL & sucurity Protocol
+ //sasl.kerberos.service.name
+ @UriParam(label = "producer")
+ private String saslKerberosServiceName;
+ //security.protocol
+ @UriParam(label = "producer", defaultValue = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL)
+ private String securityProtocol = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
+ //send.buffer.bytes
+ @UriParam(label = "producer", defaultValue = "131072")
+ private Integer sendBufferBytes = 131072;
+ //SSL
+ //ssl.enabled.protocols
+ @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS)
+ private String sslEnabledProtocols = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS;
+ //ssl.keystore.type
+ @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE)
+ private String sslKeystoreType = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE;
+ //ssl.protocol
+ @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_PROTOCOL)
+ private String sslProtocol = SslConfigs.DEFAULT_SSL_PROTOCOL;
+ //ssl.provider
+ @UriParam(label = "producer")
+ private String sslProvider;
+ //ssl.truststore.type
+ @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE)
+ private String sslTruststoreType = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE;
+ //timeout.ms
+ @UriParam(label = "producer", defaultValue = "30000")
+ private Integer timeoutMs = 30000;
+ //block.on.buffer.full
+ @UriParam(label = "producer", defaultValue = "false")
+ private Boolean blockOnBufferFull = false;
+ //max.in.flight.requests.per.connection
+ @UriParam(label = "producer", defaultValue = "5")
+ private Integer maxInFlightRequest = 5;
+ //metadata.fetch.timeout.ms
+ @UriParam(label = "producer", defaultValue = "60000")
+ private Integer metadataFetchTimeoutMs = 600 * 1000;
+ //metadata.max.age.ms
+ @UriParam(label = "producer", defaultValue = "300000")
+ private Integer metadataMaxAgeMs = 300000;
+ //metric.reporters
+ @UriParam(label = "producer")
+ private String metricReporters;
+ //metrics.num.samples
+ @UriParam(label = "producer", defaultValue = "2")
+ private Integer noOfMetricsSample = 2;
+ //metrics.sample.window.ms
+ @UriParam(label = "producer", defaultValue = "30000")
+ private Integer metricsSampleWindowMs = 30000;
+ //reconnect.backoff.ms
+ @UriParam(label = "producer", defaultValue = "50")
+ private Integer reconnectBackoffMs = 50;
+ //SASL
+ //sasl.kerberos.kinit.cmd
+ @UriParam(label = "producer", defaultValue = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD)
+ private String kerberosInitCmd = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD;
+ //sasl.kerberos.min.time.before.relogin
+ @UriParam(label = "producer", defaultValue = "60000")
+ private Integer kerberosBeforeReloginMinTime = 60000;
+ //sasl.kerberos.ticket.renew.jitter
+ @UriParam(label = "producer", defaultValue = "0.05")
+ private Double kerberosRenewJitter = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER;
+ //sasl.kerberos.ticket.renew.window.factor
+ @UriParam(label = "producer", defaultValue = "0.8")
+ private Double kerberosRenewWindowFactor = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR;
+ //SSL
+ //ssl.cipher.suites
+ @UriParam(label = "producer")
+ private String sslCipherSuites;
+ //ssl.endpoint.identification.algorithm
+ @UriParam(label = "producer")
+ private String sslEndpointAlgorithm;
+ //ssl.keymanager.algorithm
+ @UriParam(label = "producer", defaultValue = "SunX509")
+ private String sslKeymanagerAlgorithm = "SunX509";
+ //ssl.trustmanager.algorithm
+ @UriParam(label = "producer", defaultValue = "PKIX")
+ private String sslTrustmanagerAlgorithm = "PKIX";
+
public KafkaConfiguration() {
}
public Properties createProducerProperties() {
Properties props = new Properties();
- addPropertyIfNotNull(props, "request.required.acks", getRequestRequiredAcks());
- addPropertyIfNotNull(props, "partitioner.class", getPartitioner());
- addPropertyIfNotNull(props, "serializer.class", getSerializerClass());
- addPropertyIfNotNull(props, "key.serializer.class", getKeySerializerClass());
- addPropertyIfNotNull(props, "request.timeout.ms", getRequestTimeoutMs());
- addPropertyIfNotNull(props, "producer.type", getProducerType());
- addPropertyIfNotNull(props, "compression.codec", getCompressionCodec());
- addPropertyIfNotNull(props, "compressed.topics", getCompressedTopics());
- addPropertyIfNotNull(props, "message.send.max.retries", getMessageSendMaxRetries());
- addPropertyIfNotNull(props, "retry.backoff.ms", getRetryBackoffMs());
- addPropertyIfNotNull(props, "topic.metadata.refresh.interval.ms", getTopicMetadataRefreshIntervalMs());
- addPropertyIfNotNull(props, "queue.buffering.max.ms", getQueueBufferingMaxMs());
- addPropertyIfNotNull(props, "queue.buffering.max.messages", getQueueBufferingMaxMessages());
- addPropertyIfNotNull(props, "queue.enqueue.timeout.ms", getQueueEnqueueTimeoutMs());
- addPropertyIfNotNull(props, "batch.num.messages", getBatchNumMessages());
- addPropertyIfNotNull(props, "send.buffer.bytes", getSendBufferBytes());
- addPropertyIfNotNull(props, "client.id", getClientId());
+ addPropertyIfNotNull(props, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, getKeySerializerClass());
+ addPropertyIfNotNull(props, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getSerializerClass());
+ addPropertyIfNotNull(props, ProducerConfig.ACKS_CONFIG, getRequestRequiredAcks());
+ addPropertyIfNotNull(props, ProducerConfig.BUFFER_MEMORY_CONFIG, getBufferMemorySize());
+ addPropertyIfNotNull(props, ProducerConfig.COMPRESSION_TYPE_CONFIG, getCompressionCodec());
+ addPropertyIfNotNull(props, ProducerConfig.RETRIES_CONFIG, getRetries());
+ // SSL
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword());
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation());
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, getSslKeystorePassword());
+ addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, getSslTruststoreLocation());
+ addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, getSslTruststorePassword());
+ addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, getRetries());
+ addPropertyIfNotNull(props, ProducerConfig.BATCH_SIZE_CONFIG, getProducerBatchSize());
+ addPropertyIfNotNull(props, ProducerConfig.CLIENT_ID_CONFIG, getClientId());
+ addPropertyIfNotNull(props, ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, getConnectionMaxIdleMs());
+ addPropertyIfNotNull(props, ProducerConfig.LINGER_MS_CONFIG, getLingerMs());
+ addPropertyIfNotNull(props, ProducerConfig.MAX_BLOCK_MS_CONFIG, getMaxBlockMs());
+ addPropertyIfNotNull(props, ProducerConfig.MAX_REQUEST_SIZE_CONFIG, getMaxRequestSize());
+ addPropertyIfNotNull(props, ProducerConfig.PARTITIONER_CLASS_CONFIG, getPartitioner());
+ addPropertyIfNotNull(props, ProducerConfig.RECEIVE_BUFFER_CONFIG, getReceiveBufferBytes());
+ addPropertyIfNotNull(props, ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, getRequestTimeoutMs());
+ //SASL
+ addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, getSaslKerberosServiceName());
+ // Security protocol
+ addPropertyIfNotNull(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol());
+ addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, getSendBufferBytes());
+ //SSL
+ addPropertyIfNotNull(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, getSslEnabledProtocols());
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, getSslKeystoreType());
+ addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, getSslProtocol());
+ addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, getSslProvider());
+ addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, getSslTruststoreType());
+ addPropertyIfNotNull(props, ProducerConfig.TIMEOUT_CONFIG, getTimeoutMs());
+ addPropertyIfNotNull(props, ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, getBlockOnBufferFull());
+ addPropertyIfNotNull(props, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, getMaxInFlightRequest());
+ addPropertyIfNotNull(props, ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, getMetadataFetchTimeoutMs());
+ addPropertyIfNotNull(props, ProducerConfig.METADATA_MAX_AGE_CONFIG, getMetadataMaxAgeMs());
+ addPropertyIfNotNull(props, ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, getMetricReporters());
+ addPropertyIfNotNull(props, ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, getNoOfMetricsSample());
+ addPropertyIfNotNull(props, ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, getMetricsSampleWindowMs());
+ addPropertyIfNotNull(props, ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs());
+ addPropertyIfNotNull(props, ProducerConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackoffMs());
+ //SASL
+ addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_KINIT_CMD, getKerberosInitCmd());
+ addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, getKerberosBeforeReloginMinTime());
+ addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, getKerberosRenewJitter());
+ addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, getKerberosRenewWindowFactor());
+ //SSL
+ addPropertyIfNotNull(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, getSslCipherSuites());
+ addPropertyIfNotNull(props, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, getSslEndpointAlgorithm());
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, getSslKeymanagerAlgorithm());
+ addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, getSslTrustmanagerAlgorithm());
+
return props;
}
public Properties createConsumerProperties() {
Properties props = new Properties();
- addPropertyIfNotNull(props, "consumer.id", getConsumerId());
- addPropertyIfNotNull(props, "socket.timeout.ms", getSocketTimeoutMs());
- addPropertyIfNotNull(props, "socket.receive.buffer.bytes", getSocketReceiveBufferBytes());
- addPropertyIfNotNull(props, "fetch.message.max.bytes", getFetchMessageMaxBytes());
- addPropertyIfNotNull(props, "auto.commit.enable", isAutoCommitEnable());
- addPropertyIfNotNull(props, "auto.commit.interval.ms", getAutoCommitIntervalMs());
- addPropertyIfNotNull(props, "queued.max.message.chunks", getQueueBufferingMaxMessages());
- addPropertyIfNotNull(props, "fetch.min.bytes", getFetchMinBytes());
- addPropertyIfNotNull(props, "fetch.wait.max.ms", getFetchWaitMaxMs());
- addPropertyIfNotNull(props, "queued.max.message.chunks", getQueuedMaxMessageChunks());
- addPropertyIfNotNull(props, "rebalance.max.retries", getRebalanceMaxRetries());
- addPropertyIfNotNull(props, "rebalance.backoff.ms", getRebalanceBackoffMs());
- addPropertyIfNotNull(props, "refresh.leader.backoff.ms", getRefreshLeaderBackoffMs());
- addPropertyIfNotNull(props, "auto.offset.reset", getAutoOffsetReset());
- addPropertyIfNotNull(props, "consumer.timeout.ms", getConsumerTimeoutMs());
- addPropertyIfNotNull(props, "client.id", getClientId());
- addPropertyIfNotNull(props, "zookeeper.session.timeout.ms", getZookeeperSessionTimeoutMs());
- addPropertyIfNotNull(props, "zookeeper.connection.timeout.ms", getZookeeperConnectionTimeoutMs());
- addPropertyIfNotNull(props, "zookeeper.sync.time.ms", getZookeeperSyncTimeMs());
- addPropertyIfNotNull(props, "offsets.storage", getOffsetsStorage());
- addPropertyIfNotNull(props, "dual.commit.enabled", isDualCommitEnabled());
+ addPropertyIfNotNull(props, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, getKeyDeserializer());
+ addPropertyIfNotNull(props, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getValueDeserializer());
+ addPropertyIfNotNull(props, ConsumerConfig.FETCH_MIN_BYTES_CONFIG, getFetchMinBytes());
+ addPropertyIfNotNull(props, ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, getHeartbeatIntervalMs());
+ addPropertyIfNotNull(props, ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, getMaxPartitionFetchBytes());
+ addPropertyIfNotNull(props, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, getSessionTimeoutMs());
+ // SSL
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, getSslKeyPassword());
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, getSslKeystoreLocation());
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, getSslKeystorePassword());
+ addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, getSslTruststoreLocation());
+ addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, getSslTruststorePassword());
+ addPropertyIfNotNull(props, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, getAutoOffsetReset());
+ addPropertyIfNotNull(props, ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, getConnectionMaxIdleMs());
+ addPropertyIfNotNull(props, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, isAutoCommitEnable());
+ addPropertyIfNotNull(props, ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, getPartitionAssignor());
+ addPropertyIfNotNull(props, ConsumerConfig.RECEIVE_BUFFER_CONFIG, getReceiveBufferBytes());
+ addPropertyIfNotNull(props, ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, getConsumerRequestTimeoutMs());
+ //SASL
+ addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, getSaslKerberosServiceName());
+ // Security protocol
+ addPropertyIfNotNull(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol());
+ addPropertyIfNotNull(props, ProducerConfig.SEND_BUFFER_CONFIG, getSendBufferBytes());
+ //SSL
+ addPropertyIfNotNull(props, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, getSslEnabledProtocols());
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, getSslKeystoreType());
+ addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, getSslProtocol());
+ addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, getSslProvider());
+ addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, getSslTruststoreType());
+ addPropertyIfNotNull(props, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, getAutoCommitIntervalMs());
+ addPropertyIfNotNull(props, ConsumerConfig.CHECK_CRCS_CONFIG, getCheckCrcs());
+ addPropertyIfNotNull(props, ConsumerConfig.CLIENT_ID_CONFIG, getClientId());
+ addPropertyIfNotNull(props, ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, getFetchWaitMaxMs());
+ addPropertyIfNotNull(props, ConsumerConfig.METADATA_MAX_AGE_CONFIG, getMetadataMaxAgeMs());
+ addPropertyIfNotNull(props, ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, getMetricReporters());
+ addPropertyIfNotNull(props, ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, getNoOfMetricsSample());
+ addPropertyIfNotNull(props, ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, getMetricsSampleWindowMs());
+ addPropertyIfNotNull(props, ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs());
+ addPropertyIfNotNull(props, ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, getRetryBackoffMs());
+ //SASL
+ addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_KINIT_CMD, getKerberosInitCmd());
+ addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, getKerberosBeforeReloginMinTime());
+ addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, getKerberosRenewJitter());
+ addPropertyIfNotNull(props, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, getKerberosRenewWindowFactor());
+ //SSL
+ addPropertyIfNotNull(props, SslConfigs.SSL_CIPHER_SUITES_CONFIG, getSslCipherSuites());
+ addPropertyIfNotNull(props, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, getSslEndpointAlgorithm());
+ addPropertyIfNotNull(props, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, getSslKeymanagerAlgorithm());
+ addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, getSslTrustmanagerAlgorithm());
return props;
}
@@ -191,65 +359,6 @@ public class KafkaConfiguration implements Cloneable {
}
}
- public String getZookeeperConnect() {
- if (this.zookeeperConnect != null) {
- return zookeeperConnect;
- } else {
- return getZookeeperHost() + ":" + getZookeeperPort();
- }
- }
-
- /**
- * Specifies the ZooKeeper connection string in the form hostname:port where host and port are the host and port of a ZooKeeper server.
- * To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is down you can also specify multiple hosts in the
- * form hostname1:port1,hostname2:port2,hostname3:port3.
- * The server may also have a ZooKeeper chroot path as part of it's ZooKeeper connection string which puts its data
- * under some path in the global ZooKeeper namespace. If so the consumer should use the same chroot path in its connection string.
- * For example to give a chroot path of /chroot/path you would give the connection
- * string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.
- */
- public void setZookeeperConnect(String zookeeperConnect) {
- this.zookeeperConnect = zookeeperConnect;
-
- // connect overrides host and port
- this.zookeeperHost = null;
- this.zookeeperPort = -1;
- }
-
- public String getZookeeperHost() {
- return zookeeperHost;
- }
-
- /**
- * The zookeeper host to use.
- * <p/>
- * To connect to multiple zookeeper hosts use the zookeeperConnect option instead.
- * <p/>
- * This option can only be used if zookeeperConnect is not in use.
- */
- public void setZookeeperHost(String zookeeperHost) {
- if (this.zookeeperConnect == null) {
- this.zookeeperHost = zookeeperHost;
- }
- }
-
- public int getZookeeperPort() {
- return zookeeperPort;
- }
-
- /**
- * The zookeeper port to use
- * <p/>
- * To connect to multiple zookeeper hosts use the zookeeperConnect option instead.
- * <p/>
- * This option can only be used if zookeeperConnect is not in use.
- */
- public void setZookeeperPort(int zookeeperPort) {
- if (this.zookeeperConnect == null) {
- this.zookeeperPort = zookeeperPort;
- }
- }
-
public String getGroupId() {
return groupId;
}
@@ -278,7 +387,7 @@ public class KafkaConfiguration implements Cloneable {
}
/**
- * Name of the topic to use
+ * Name of the topic to use.
*/
public void setTopic(String topic) {
this.topic = topic;
@@ -351,440 +460,741 @@ public class KafkaConfiguration implements Cloneable {
this.consumerId = consumerId;
}
- public Integer getSocketTimeoutMs() {
- return socketTimeoutMs;
+ public Boolean isAutoCommitEnable() {
+ return autoCommitEnable;
}
/**
- * The socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms.
+ * If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer.
+ * This committed offset will be used when the process fails as the position from which the new consumer will begin.
*/
- public void setSocketTimeoutMs(Integer socketTimeoutMs) {
- this.socketTimeoutMs = socketTimeoutMs;
+ public void setAutoCommitEnable(Boolean autoCommitEnable) {
+ this.autoCommitEnable = autoCommitEnable;
}
- public Integer getSocketReceiveBufferBytes() {
- return socketReceiveBufferBytes;
+ public Integer getAutoCommitIntervalMs() {
+ return autoCommitIntervalMs;
}
/**
- * The socket receive buffer for network requests
+ * The frequency in ms that the consumer offsets are committed to zookeeper.
*/
- public void setSocketReceiveBufferBytes(Integer socketReceiveBufferBytes) {
- this.socketReceiveBufferBytes = socketReceiveBufferBytes;
+ public void setAutoCommitIntervalMs(Integer autoCommitIntervalMs) {
+ this.autoCommitIntervalMs = autoCommitIntervalMs;
}
- public Integer getFetchMessageMaxBytes() {
- return fetchMessageMaxBytes;
+ public Integer getFetchMinBytes() {
+ return fetchMinBytes;
}
/**
- * The number of byes of messages to attempt to fetch for each topic-partition in each fetch request.
- * These bytes will be read into memory for each partition, so this helps control the memory used by the consumer.
- * The fetch request size must be at least as large as the maximum message size the server allows or else it
- * is possible for the producer to send messages larger than the consumer can fetch.
+ * The minimum amount of data the server should return for a fetch request.
+ * If insufficient data is available the request will wait for that much data to accumulate before answering the request.
*/
- public void setFetchMessageMaxBytes(Integer fetchMessageMaxBytes) {
- this.fetchMessageMaxBytes = fetchMessageMaxBytes;
+ public void setFetchMinBytes(Integer fetchMinBytes) {
+ this.fetchMinBytes = fetchMinBytes;
}
- public Boolean isAutoCommitEnable() {
- return autoCommitEnable;
+ public Integer getFetchWaitMaxMs() {
+ return fetchWaitMaxMs;
}
/**
- * If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer.
- * This committed offset will be used when the process fails as the position from which the new consumer will begin.
+ * The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes
*/
- public void setAutoCommitEnable(Boolean autoCommitEnable) {
- this.autoCommitEnable = autoCommitEnable;
+ public void setFetchWaitMaxMs(Integer fetchWaitMaxMs) {
+ this.fetchWaitMaxMs = fetchWaitMaxMs;
}
- public Integer getAutoCommitIntervalMs() {
- return autoCommitIntervalMs;
+ public String getAutoOffsetReset() {
+ return autoOffsetReset;
}
/**
- * The frequency in ms that the consumer offsets are committed to zookeeper.
+ * What to do when there is no initial offset in ZooKeeper or if an offset is out of range:
+ * smallest : automatically reset the offset to the smallest offset
+ * largest : automatically reset the offset to the largest offset
+ * fail: throw exception to the consumer
*/
- public void setAutoCommitIntervalMs(Integer autoCommitIntervalMs) {
- this.autoCommitIntervalMs = autoCommitIntervalMs;
+ public void setAutoOffsetReset(String autoOffsetReset) {
+ this.autoOffsetReset = autoOffsetReset;
}
- public Integer getQueuedMaxMessageChunks() {
- return queuedMaxMessageChunks;
+ public String getBrokers() {
+ return brokers;
}
/**
- * Max number of message chunks buffered for consumption. Each chunk can be up to fetch.message.max.bytes.
+ * This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas).
+ * The socket connections for sending the actual data will be established based on the broker information returned in the metadata.
+ * The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.
+ * <p/>
+ * This option is known as <tt>metadata.broker.list</tt> in the Kafka documentation.
*/
- public void setQueuedMaxMessageChunks(Integer queuedMaxMessageChunks) {
- this.queuedMaxMessageChunks = queuedMaxMessageChunks;
+ public void setBrokers(String brokers) {
+ this.brokers = brokers;
}
- public Integer getRebalanceMaxRetries() {
- return rebalanceMaxRetries;
+ public String getCompressionCodec() {
+ return compressionCodec;
}
/**
- * When a new consumer joins a consumer group the set of consumers attempt to "rebalance" the load to assign partitions to each consumer.
- * If the set of consumers changes while this assignment is taking place the rebalance will fail and retry.
- * This setting controls the maximum number of attempts before giving up.
+ * This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are "none", "gzip" and "snappy".
*/
- public void setRebalanceMaxRetries(Integer rebalanceMaxRetries) {
- this.rebalanceMaxRetries = rebalanceMaxRetries;
+ public void setCompressionCodec(String compressionCodec) {
+ this.compressionCodec = compressionCodec;
}
- public Integer getFetchMinBytes() {
- return fetchMinBytes;
+ public Integer getRetryBackoffMs() {
+ return retryBackoffMs;
}
/**
- * The minimum amount of data the server should return for a fetch request.
- * If insufficient data is available the request will wait for that much data to accumulate before answering the request.
+ * Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected.
+ * Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.
*/
- public void setFetchMinBytes(Integer fetchMinBytes) {
- this.fetchMinBytes = fetchMinBytes;
+ public void setRetryBackoffMs(Integer retryBackoffMs) {
+ this.retryBackoffMs = retryBackoffMs;
}
- public Integer getFetchWaitMaxMs() {
- return fetchWaitMaxMs;
+ public Integer getSendBufferBytes() {
+ return sendBufferBytes;
}
/**
- * The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes
+ * Socket write buffer size
*/
- public void setFetchWaitMaxMs(Integer fetchWaitMaxMs) {
- this.fetchWaitMaxMs = fetchWaitMaxMs;
+ public void setSendBufferBytes(Integer sendBufferBytes) {
+ this.sendBufferBytes = sendBufferBytes;
}
- public Integer getRebalanceBackoffMs() {
- return rebalanceBackoffMs;
+ public Integer getRequestTimeoutMs() {
+ return requestTimeoutMs;
}
/**
- * Backoff time between retries during rebalance.
+ * The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back an error to the client.
*/
- public void setRebalanceBackoffMs(Integer rebalanceBackoffMs) {
- this.rebalanceBackoffMs = rebalanceBackoffMs;
+ public void setRequestTimeoutMs(Integer requestTimeoutMs) {
+ this.requestTimeoutMs = requestTimeoutMs;
}
- public Integer getRefreshLeaderBackoffMs() {
- return refreshLeaderBackoffMs;
+ public Integer getQueueBufferingMaxMessages() {
+ return queueBufferingMaxMessages;
}
/**
- * Backoff time to wait before trying to determine the leader of a partition that has just lost its leader.
+ * The maximum number of unsent messages that can be queued up the producer when using async
+ * mode before either the producer must be blocked or data must be dropped.
*/
- public void setRefreshLeaderBackoffMs(Integer refreshLeaderBackoffMs) {
- this.refreshLeaderBackoffMs = refreshLeaderBackoffMs;
+ public void setQueueBufferingMaxMessages(Integer queueBufferingMaxMessages) {
+ this.queueBufferingMaxMessages = queueBufferingMaxMessages;
}
- public String getAutoOffsetReset() {
- return autoOffsetReset;
+ public String getSerializerClass() {
+ if (serializerClass == null) {
+ return KafkaConstants.KAFKA_DEFAULT_SERIALIZER;
+ }
+ return serializerClass;
}
/**
- * What to do when there is no initial offset in ZooKeeper or if an offset is out of range:
- * smallest : automatically reset the offset to the smallest offset
- * largest : automatically reset the offset to the largest offset
- * fail: throw exception to the consumer
+ * The serializer class for messages. The default encoder takes a byte[] and returns the same byte[].
+ * The default class is kafka.serializer.DefaultEncoder
*/
- public void setAutoOffsetReset(String autoOffsetReset) {
- this.autoOffsetReset = autoOffsetReset;
+ public void setSerializerClass(String serializerClass) {
+ this.serializerClass = serializerClass;
}
- public Integer getConsumerTimeoutMs() {
- return consumerTimeoutMs;
+ public String getKeySerializerClass() {
+ if (keySerializerClass == null) {
+ return KafkaConstants.KAFKA_DEFAULT_SERIALIZER;
+ }
+ return keySerializerClass;
}
/**
- * Throw a timeout exception to the consumer if no message is available for consumption after the specified interval
+ * The serializer class for keys (defaults to the same as for messages if nothing is given).
*/
- public void setConsumerTimeoutMs(Integer consumerTimeoutMs) {
- this.consumerTimeoutMs = consumerTimeoutMs;
+ public void setKeySerializerClass(String keySerializerClass) {
+ this.keySerializerClass = keySerializerClass;
}
- public Integer getZookeeperSessionTimeoutMs() {
- return zookeeperSessionTimeoutMs;
+ public String getKerberosInitCmd() {
+ return kerberosInitCmd;
}
/**
- * ZooKeeper session timeout. If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur.
+ * Kerberos kinit command path. Default is /usr/bin/kinit
*/
- public void setZookeeperSessionTimeoutMs(Integer zookeeperSessionTimeoutMs) {
- this.zookeeperSessionTimeoutMs = zookeeperSessionTimeoutMs;
+ public void setKerberosInitCmd(String kerberosInitCmd) {
+ this.kerberosInitCmd = kerberosInitCmd;
}
- public Integer getZookeeperConnectionTimeoutMs() {
- return zookeeperConnectionTimeoutMs;
+ public Integer getKerberosBeforeReloginMinTime() {
+ return kerberosBeforeReloginMinTime;
}
/**
- * The max time that the client waits while establishing a connection to zookeeper.
+ * Login thread sleep time between refresh attempts.
*/
- public void setZookeeperConnectionTimeoutMs(Integer zookeeperConnectionTimeoutMs) {
- this.zookeeperConnectionTimeoutMs = zookeeperConnectionTimeoutMs;
+ public void setKerberosBeforeReloginMinTime(Integer kerberosBeforeReloginMinTime) {
+ this.kerberosBeforeReloginMinTime = kerberosBeforeReloginMinTime;
}
- public Integer getZookeeperSyncTimeMs() {
- return zookeeperSyncTimeMs;
+ public Double getKerberosRenewJitter() {
+ return kerberosRenewJitter;
}
/**
- * How far a ZK follower can be behind a ZK leader
+ * Percentage of random jitter added to the renewal time.
*/
- public void setZookeeperSyncTimeMs(Integer zookeeperSyncTimeMs) {
- this.zookeeperSyncTimeMs = zookeeperSyncTimeMs;
+ public void setKerberosRenewJitter(Double kerberosRenewJitter) {
+ this.kerberosRenewJitter = kerberosRenewJitter;
}
- public String getBrokers() {
- return brokers;
+ public Double getKerberosRenewWindowFactor() {
+ return kerberosRenewWindowFactor;
}
/**
- * This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas).
- * The socket connections for sending the actual data will be established based on the broker information returned in the metadata.
- * The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.
- * <p/>
- * This option is known as <tt>metadata.broker.list</tt> in the Kafka documentation.
+ * Login thread will sleep until the specified window factor of time from last
+ * refresh to ticket's expiry has been reached, at which time it will try to renew the ticket.
*/
- public void setBrokers(String brokers) {
- this.brokers = brokers;
+ public void setKerberosRenewWindowFactor(Double kerberosRenewWindowFactor) {
+ this.kerberosRenewWindowFactor = kerberosRenewWindowFactor;
}
- public String getProducerType() {
- return producerType;
+ public String getSslCipherSuites() {
+ return sslCipherSuites;
}
/**
- * This parameter specifies whether the messages are sent asynchronously in a background thread.
- * Valid values are (1) async for asynchronous send and (2) sync for synchronous send.
- * By setting the producer to async we allow batching together of requests (which is great for throughput)
- * but open the possibility of a failure of the client machine dropping unsent data.
+ * A list of cipher suites. This is a named combination of authentication, encryption,
+ * MAC and key exchange algorithm used to negotiate the security settings for a network connection
+ * using TLS or SSL network protocol.By default all the available cipher suites are supported.
*/
- public void setProducerType(String producerType) {
- this.producerType = producerType;
+ public void setSslCipherSuites(String sslCipherSuites) {
+ this.sslCipherSuites = sslCipherSuites;
}
- public String getCompressionCodec() {
- return compressionCodec;
+ public String getSslEndpointAlgorithm() {
+ return sslEndpointAlgorithm;
}
/**
- * This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are "none", "gzip" and "snappy".
+ * The endpoint identification algorithm to validate server hostname using server certificate.
*/
- public void setCompressionCodec(String compressionCodec) {
- this.compressionCodec = compressionCodec;
+ public void setSslEndpointAlgorithm(String sslEndpointAlgorithm) {
+ this.sslEndpointAlgorithm = sslEndpointAlgorithm;
}
- public String getCompressedTopics() {
- return compressedTopics;
+ public String getSslKeymanagerAlgorithm() {
+ return sslKeymanagerAlgorithm;
}
/**
- * This parameter allows you to set whether compression should be turned on for particular topics.
- * If the compression codec is anything other than NoCompressionCodec, enable compression only for specified topics if any.
- * If the list of compressed topics is empty, then enable the specified compression codec for all topics.
- * If the compression codec is NoCompressionCodec, compression is disabled for all topics
+ * The algorithm used by key manager factory for SSL connections. Default value is the key
+ * manager factory algorithm configured for the Java Virtual Machine.
*/
- public void setCompressedTopics(String compressedTopics) {
- this.compressedTopics = compressedTopics;
+ public void setSslKeymanagerAlgorithm(String sslKeymanagerAlgorithm) {
+ this.sslKeymanagerAlgorithm = sslKeymanagerAlgorithm;
}
- public Integer getMessageSendMaxRetries() {
- return messageSendMaxRetries;
+ public String getSslTrustmanagerAlgorithm() {
+ return sslTrustmanagerAlgorithm;
}
/**
- * This property will cause the producer to automatically retry a failed send request.
- * This property specifies the number of retries when such failures occur. Note that setting a non-zero value here
- * can lead to duplicates in the case of network errors that cause a message to be sent but the acknowledgement to be lost.
+ * The algorithm used by trust manager factory for SSL connections. Default value is the
+ * trust manager factory algorithm configured for the Java Virtual Machine.
*/
- public void setMessageSendMaxRetries(Integer messageSendMaxRetries) {
- this.messageSendMaxRetries = messageSendMaxRetries;
+ public void setSslTrustmanagerAlgorithm(String sslTrustmanagerAlgorithm) {
+ this.sslTrustmanagerAlgorithm = sslTrustmanagerAlgorithm;
}
- public Integer getRetryBackoffMs() {
- return retryBackoffMs;
+ public String getSslEnabledProtocols() {
+ return sslEnabledProtocols;
}
/**
- * Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected.
- * Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.
+ * The list of protocols enabled for SSL connections. TLSv1.2, TLSv1.1 and TLSv1 are enabled by default.
*/
- public void setRetryBackoffMs(Integer retryBackoffMs) {
- this.retryBackoffMs = retryBackoffMs;
+ public void setSslEnabledProtocols(String sslEnabledProtocols) {
+ this.sslEnabledProtocols = sslEnabledProtocols;
}
- public Integer getTopicMetadataRefreshIntervalMs() {
- return topicMetadataRefreshIntervalMs;
+ public String getSslKeystoreType() {
+ return sslKeystoreType;
}
/**
- * The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing,
- * leader not available...). It will also poll regularly (default: every 10min so 600000ms).
- * If you set this to a negative value, metadata will only get refreshed on failure.
- * If you set this to zero, the metadata will get refreshed after each message sent (not recommended).
- * Important note: the refresh happen only AFTER the message is sent, so if the producer never
- * sends a message the metadata is never refreshed
+ * The file format of the key store file. This is optional for client. Default value is JKS
*/
- public void setTopicMetadataRefreshIntervalMs(Integer topicMetadataRefreshIntervalMs) {
- this.topicMetadataRefreshIntervalMs = topicMetadataRefreshIntervalMs;
+ public void setSslKeystoreType(String sslKeystoreType) {
+ this.sslKeystoreType = sslKeystoreType;
}
- public Integer getSendBufferBytes() {
- return sendBufferBytes;
+ public String getSslProtocol() {
+ return sslProtocol;
}
/**
- * Socket write buffer size
+ * The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases.
+ * Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs,
+ * but their usage is discouraged due to known security vulnerabilities.
*/
- public void setSendBufferBytes(Integer sendBufferBytes) {
- this.sendBufferBytes = sendBufferBytes;
+ public void setSslProtocol(String sslProtocol) {
+ this.sslProtocol = sslProtocol;
+ }
+
+ public String getSslProvider() {
+ return sslProvider;
+ }
+
+ /**
+ * The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.
+ */
+ public void setSslProvider(String sslProvider) {
+ this.sslProvider = sslProvider;
+ }
+
+ public String getSslTruststoreType() {
+ return sslTruststoreType;
+ }
+
+ /**
+ * The file format of the trust store file. Default value is JKS.
+ */
+ public void setSslTruststoreType(String sslTruststoreType) {
+ this.sslTruststoreType = sslTruststoreType;
+ }
+
+ public String getSaslKerberosServiceName() {
+ return saslKerberosServiceName;
+ }
+
+ /**
+ * The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS
+ * config or in Kafka's config.
+ */
+ public void setSaslKerberosServiceName(String saslKerberosServiceName) {
+ this.saslKerberosServiceName = saslKerberosServiceName;
+ }
+
+ public String getSecurityProtocol() {
+ return securityProtocol;
+ }
+
+ /**
+ * Protocol used to communicate with brokers. Currently only PLAINTEXT and SSL are supported.
+ */
+ public void setSecurityProtocol(String securityProtocol) {
+ this.securityProtocol = securityProtocol;
+ }
+
+ public String getSslKeyPassword() {
+ return sslKeyPassword;
+ }
+
+ /**
+ * The password of the private key in the key store file. This is optional for client.
+ */
+ public void setSslKeyPassword(String sslKeyPassword) {
+ this.sslKeyPassword = sslKeyPassword;
+ }
+
+ public String getSslKeystoreLocation() {
+ return sslKeystoreLocation;
}
- public short getRequestRequiredAcks() {
+ /**
+ * The location of the key store file. This is optional for client and can be used for two-way
+ * authentication for client.
+ */
+ public void setSslKeystoreLocation(String sslKeystoreLocation) {
+ this.sslKeystoreLocation = sslKeystoreLocation;
+ }
+
+ public String getSslKeystorePassword() {
+ return sslKeystorePassword;
+ }
+
+ /**
+ * The store password for the key store file.This is optional for client and only needed
+ * if ssl.keystore.location is configured.
+ */
+ public void setSslKeystorePassword(String sslKeystorePassword) {
+ this.sslKeystorePassword = sslKeystorePassword;
+ }
+
+ public String getSslTruststoreLocation() {
+ return sslTruststoreLocation;
+ }
+
+ /**
+ * The location of the trust store file.
+ */
+ public void setSslTruststoreLocation(String sslTruststoreLocation) {
+ this.sslTruststoreLocation = sslTruststoreLocation;
+ }
+
+ public String getSslTruststorePassword() {
+ return sslTruststorePassword;
+ }
+
+
+ /**
+ * The password for the trust store file.
+ */
+ public void setSslTruststorePassword(String sslTruststorePassword) {
+ this.sslTruststorePassword = sslTruststorePassword;
+ }
+
+ public Integer getBufferMemorySize() {
+ return bufferMemorySize;
+ }
+
+ /**
+ * The total bytes of memory the producer can use to buffer records waiting to be sent to the server.
+ * If records are sent faster than they can be delivered to the server the producer will either block
+ * or throw an exception based on the preference specified by block.on.buffer.full.This setting should
+ * correspond roughly to the total memory the producer will use, but is not a hard bound since not all
+ * memory the producer uses is used for buffering. Some additional memory will be used for compression
+ * (if compression is enabled) as well as for maintaining in-flight requests.
+ */
+ public void setBufferMemorySize(Integer bufferMemorySize) {
+ this.bufferMemorySize = bufferMemorySize;
+ }
+
+ public Boolean getBlockOnBufferFull() {
+ return blockOnBufferFull;
+ }
+
+ /**
+ * When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors.
+ * By default this setting is true and we block, however in some scenarios blocking is not desirable and it
+ * is better to immediately give an error. Setting this to false will accomplish that: the producer will throw
+ * a BufferExhaustedException if a recrord is sent and the buffer space is full.
+ */
+ public void setBlockOnBufferFull(Boolean blockOnBufferFull) {
+ this.blockOnBufferFull = blockOnBufferFull;
+ }
+
+ public Integer getRequestRequiredAcks() {
return requestRequiredAcks;
}
/**
- * This value controls when a produce request is considered completed. Specifically,
- * how many other brokers must have committed the data to their log and acknowledged this to the leader?
- * Typical values are (0, 1 or -1):
- * 0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7).
- * This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).
- * 1, which means that the producer gets an acknowledgement after the leader replica has received the data.
- * This option provides better durability as the client waits until the server acknowledges the request as successful
- * (only messages that were written to the now-dead leader but not yet replicated will be lost).
- * -1, The producer gets an acknowledgement after all in-sync replicas have received the data.
- * This option provides the greatest level of durability.
- * However, it does not completely eliminate the risk of message loss because the number of in sync replicas may,
- * in rare cases, shrink to 1. If you want to ensure that some minimum number of replicas
- * (typically a majority) receive a write, then you must set the topic-level min.insync.replicas setting.
- * Please read the Replication section of the design documentation for a more in-depth discussion.
+ * The number of acknowledgments the producer requires the leader to have received before considering a request complete.
+ * This controls the durability of records that are sent. The following settings are common:
+ * acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all.
+ * The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server
+ * has received the record in this case, and the retries configuration will not take effect (as the client won't generally
+ * know of any failures). The offset given back for each record will always be set to -1.
+ * acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement
+ * from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have
+ * replicated it then the record will be lost.
+ * acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the
+ * record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.
*/
- public void setRequestRequiredAcks(short requestRequiredAcks) {
+ public void setRequestRequiredAcks(Integer requestRequiredAcks) {
this.requestRequiredAcks = requestRequiredAcks;
}
- public Integer getRequestTimeoutMs() {
- return requestTimeoutMs;
+ public Integer getRetries() {
+ return retries;
}
/**
- * The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back an error to the client.
+ * Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error.
+ * Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries will potentially
+ * change the ordering of records because if two records are sent to a single partition, and the first fails and is retried but the second
+ * succeeds, then the second record may appear first.
*/
- public void setRequestTimeoutMs(Integer requestTimeoutMs) {
- this.requestTimeoutMs = requestTimeoutMs;
+ public void setRetries(Integer retries) {
+ this.retries = retries;
}
- public Integer getQueueBufferingMaxMs() {
- return queueBufferingMaxMs;
+ public Integer getProducerBatchSize() {
+ return producerBatchSize;
}
/**
- * Maximum time to buffer data when using async mode.
- * For example a setting of 100 will try to batch together 100ms of messages to send at once.
- * This will improve throughput but adds message delivery latency due to the buffering.
+ * The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition.
+ * This helps performance on both the client and the server. This configuration controls the default batch size in bytes.
+ * No attempt will be made to batch records larger than this size.Requests sent to brokers will contain multiple batches, one for each
+ * partition with data available to be sent.A small batch size will make batching less common and may reduce throughput (a batch size of zero
+ * will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the
+ * specified batch size in anticipation of additional records.
*/
- public void setQueueBufferingMaxMs(Integer queueBufferingMaxMs) {
- this.queueBufferingMaxMs = queueBufferingMaxMs;
+ public void setProducerBatchSize(Integer producerBatchSize) {
+ this.producerBatchSize = producerBatchSize;
}
- public Integer getQueueBufferingMaxMessages() {
- return queueBufferingMaxMessages;
+ public Integer getConnectionMaxIdleMs() {
+ return connectionMaxIdleMs;
}
/**
- * The maximum number of unsent messages that can be queued up the producer when using async
- * mode before either the producer must be blocked or data must be dropped.
+ * Close idle connections after the number of milliseconds specified by this config.
*/
- public void setQueueBufferingMaxMessages(Integer queueBufferingMaxMessages) {
- this.queueBufferingMaxMessages = queueBufferingMaxMessages;
+ public void setConnectionMaxIdleMs(Integer connectionMaxIdleMs) {
+ this.connectionMaxIdleMs = connectionMaxIdleMs;
}
- public Integer getQueueEnqueueTimeoutMs() {
- return queueEnqueueTimeoutMs;
+ public Integer getLingerMs() {
+ return lingerMs;
}
/**
- * The amount of time to block before dropping messages when running in async mode and the buffer has reached
- * queue.buffering.max.messages. If set to 0 events will be enqueued immediately or dropped if the queue is full
- * (the producer send call will never block). If set to -1 the producer will block indefinitely and never willingly drop a send.
+ * The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this
+ * occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce
+ * the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay—that is,
+ * rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that
+ * the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on
+ * the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting,
+ * however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more
+ * records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5, for example, would have the effect of reducing the
+ * number of requests sent but would add up to 5ms of latency to records sent in the absense of load.
*/
- public void setQueueEnqueueTimeoutMs(Integer queueEnqueueTimeoutMs) {
- this.queueEnqueueTimeoutMs = queueEnqueueTimeoutMs;
+ public void setLingerMs(Integer lingerMs) {
+ this.lingerMs = lingerMs;
}
- public Integer getBatchNumMessages() {
- return batchNumMessages;
+ public Integer getMaxBlockMs() {
+ return maxBlockMs;
}
/**
- * The number of messages to send in one batch when using async mode.
- * The producer will wait until either this number of messages are ready to send or queue.buffer.max.ms is reached.
+ * The configuration controls how long sending to kafka will block. These methods can be
+ * blocked for multiple reasons. For e.g: buffer full, metadata unavailable.This configuration imposes maximum limit on the total time spent
+ * in fetching metadata, serialization of key and value, partitioning and allocation of buffer memory when doing a send(). In case of
+ * partitionsFor(), this configuration imposes a maximum time threshold on waiting for metadata
*/
- public void setBatchNumMessages(Integer batchNumMessages) {
- this.batchNumMessages = batchNumMessages;
+ public void setMaxBlockMs(Integer maxBlockMs) {
+ this.maxBlockMs = maxBlockMs;
}
- public String getSerializerClass() {
- return serializerClass;
+ public Integer getMaxRequestSize() {
+ return maxRequestSize;
}
/**
- * The serializer class for messages. The default encoder takes a byte[] and returns the same byte[].
- * The default class is kafka.serializer.DefaultEncoder
+ * The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server has its own cap on record size
+ * which may be different from this. This setting will limit the number of record batches the producer will send in a single request to avoid
+ * sending huge requests.
*/
- public void setSerializerClass(String serializerClass) {
- this.serializerClass = serializerClass;
+ public void setMaxRequestSize(Integer maxRequestSize) {
+ this.maxRequestSize = maxRequestSize;
}
- public String getKeySerializerClass() {
- return keySerializerClass;
+ public Integer getReceiveBufferBytes() {
+ return receiveBufferBytes;
}
/**
- * The serializer class for keys (defaults to the same as for messages if nothing is given).
+ * The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.
*/
- public void setKeySerializerClass(String keySerializerClass) {
- this.keySerializerClass = keySerializerClass;
+ public void setReceiveBufferBytes(Integer receiveBufferBytes) {
+ this.receiveBufferBytes = receiveBufferBytes;
}
- public String getOffsetsStorage() {
- return offsetsStorage;
+ public Integer getTimeoutMs() {
+ return timeoutMs;
}
/**
- * Select where offsets should be stored (zookeeper or kafka).
+ * The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to meet the
+ * acknowledgment requirements the producer has specified with the acks configuration. If the requested number of acknowledgments
+ * are not met when the timeout elapses an error will be returned. This timeout is measured on the server side and does not include
+ * the network latency of the request.
*/
- public void setOffsetsStorage(String offsetsStorage) {
- this.offsetsStorage = offsetsStorage;
+ public void setTimeoutMs(Integer timeoutMs) {
+ this.timeoutMs = timeoutMs;
}
- public Boolean isDualCommitEnabled() {
- return dualCommitEnabled;
+ public Integer getMaxInFlightRequest() {
+ return maxInFlightRequest;
}
/**
- * If you are using "kafka" as offsets.storage, you can dual commit offsets to ZooKeeper (in addition to Kafka).
- * This is required during migration from zookeeper-based offset storage to kafka-based offset storage.
- * With respect to any given consumer group, it is safe to turn this off after all instances within that group have been migrated
- * to the new version that commits offsets to the broker (instead of directly to ZooKeeper).
+ * The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting
+ * is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).
*/
- public void setDualCommitEnabled(Boolean dualCommitEnabled) {
- this.dualCommitEnabled = dualCommitEnabled;
+ public void setMaxInFlightRequest(Integer maxInFlightRequest) {
+ this.maxInFlightRequest = maxInFlightRequest;
+ }
+
+ public Integer getMetadataFetchTimeoutMs() {
+ return metadataFetchTimeoutMs;
}
/**
- * Returns a copy of this configuration
+ * The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the topic's partitions.
+ * This fetch to succeed before throwing an exception back to the client.
*/
- public KafkaConfiguration copy() {
- try {
- return (KafkaConfiguration)clone();
- } catch (CloneNotSupportedException e) {
- throw new RuntimeCamelException(e);
- }
+ public void setMetadataFetchTimeoutMs(Integer metadataFetchTimeoutMs) {
+ this.metadataFetchTimeoutMs = metadataFetchTimeoutMs;
+ }
+
+ public Integer getMetadataMaxAgeMs() {
+ return metadataMaxAgeMs;
+ }
+
+ /**
+ * The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership
+ * changes to proactively discover any new brokers or partitions.
+ */
+ public void setMetadataMaxAgeMs(Integer metadataMaxAgeMs) {
+ this.metadataMaxAgeMs = metadataMaxAgeMs;
+ }
+
+ public String getMetricReporters() {
+ return metricReporters;
+ }
+
+ /**
+ * A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be
+ * notified of new metric creation. The JmxReporter is always included to register JMX statistics.
+ */
+ public void setMetricReporters(String metricReporters) {
+ this.metricReporters = metricReporters;
+ }
+
+ public Integer getNoOfMetricsSample() {
+ return noOfMetricsSample;
+ }
+
+ /**
+ * The number of samples maintained to compute metrics.
+ */
+ public void setNoOfMetricsSample(Integer noOfMetricsSample) {
+ this.noOfMetricsSample = noOfMetricsSample;
+ }
+
+ public Integer getMetricsSampleWindowMs() {
+ return metricsSampleWindowMs;
+ }
+
+ /**
+ * The number of samples maintained to compute metrics.
+ */
+ public void setMetricsSampleWindowMs(Integer metricsSampleWindowMs) {
+ this.metricsSampleWindowMs = metricsSampleWindowMs;
+ }
+
+ public Integer getReconnectBackoffMs() {
+ return reconnectBackoffMs;
+ }
+
+ /**
+ * The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host
+ * in a tight loop. This backoff applies to all requests sent by the consumer to the broker.
+ */
+ public void setReconnectBackoffMs(Integer reconnectBackoffMs) {
+ this.reconnectBackoffMs = reconnectBackoffMs;
+ }
+
+ public Integer getHeartbeatIntervalMs() {
+ return heartbeatIntervalMs;
+ }
+
+ /**
+ * The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities.
+ * Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new
+ * consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set
+ * no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.
+ */
+ public void setHeartbeatIntervalMs(Integer heartbeatIntervalMs) {
+ this.heartbeatIntervalMs = heartbeatIntervalMs;
+ }
+
+ public Integer getMaxPartitionFetchBytes() {
+ return maxPartitionFetchBytes;
+ }
+
+ /**
+ * The maximum amount of data per-partition the server will return. The maximum total memory used for
+ * a request will be #partitions * max.partition.fetch.bytes. This size must be at least as large as the
+ * maximum message size the server allows or else it is possible for the producer to send messages larger
+ * than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message
+ * on a certain partition.
+ */
+ public void setMaxPartitionFetchBytes(Integer maxPartitionFetchBytes) {
+ this.maxPartitionFetchBytes = maxPartitionFetchBytes;
+ }
+
+ public Integer getSessionTimeoutMs() {
+ return sessionTimeoutMs;
+ }
+
+ /**
+ * The timeout used to detect failures when using Kafka's group management facilities.
+ */
+ public void setSessionTimeoutMs(Integer sessionTimeoutMs) {
+ this.sessionTimeoutMs = sessionTimeoutMs;
+ }
+
+ public String getPartitionAssignor() {
+ return partitionAssignor;
+ }
+
+ /**
+ * The class name of the partition assignment strategy that the client will use to distribute
+ * partition ownership amongst consumer instances when group management is used
+ */
+ public void setPartitionAssignor(String partitionAssignor) {
+ this.partitionAssignor = partitionAssignor;
+ }
+
+ public Integer getConsumerRequestTimeoutMs() {
+ return consumerRequestTimeoutMs;
+ }
+
+ /**
+ * The configuration controls the maximum amount of time the client will wait for the response
+ * of a request. If the response is not received before the timeout elapses the client will resend
+ * the request if necessary or fail the request if retries are exhausted.
+ */
+ public void setConsumerRequestTimeoutMs(Integer consumerRequestTimeoutMs) {
+ this.consumerRequestTimeoutMs = consumerRequestTimeoutMs;
+ }
+
+ public Boolean getCheckCrcs() {
+ return checkCrcs;
+ }
+
+ /**
+ * Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk
+ * corruption to the messages occurred. This check adds some overhead, so it may be disabled in
+ * cases seeking extreme performance.
+ */
+ public void setCheckCrcs(Boolean checkCrcs) {
+ this.checkCrcs = checkCrcs;
+ }
+
+ public String getKeyDeserializer() {
+ return keyDeserializer;
+ }
+
+ /**
+ * Deserializer class for key that implements the Deserializer interface.
+ */
+ public void setKeyDeserializer(String keyDeserializer) {
+ this.keyDeserializer = keyDeserializer;
+ }
+
+ public String getValueDeserializer() {
+ return valueDeserializer;
+ }
+
+ /**
+ * Deserializer class for value that implements the Deserializer interface.
+ */
+ public void setValueDeserializer(String valueDeserializer) {
+ this.valueDeserializer = valueDeserializer;
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/b2aa831d/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
index d3ff482..db99a09 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConstants.java
@@ -16,21 +16,20 @@
*/
package org.apache.camel.component.kafka;
-/**
- *
- */
public final class KafkaConstants {
- public static final String DEFAULT_GROUP = "group1";
-
public static final String PARTITION_KEY = "kafka.PARTITION_KEY";
- public static final String PARTITION = "kafka.PARTITION";
- public static final String KEY = "kafka.KEY";
+ public static final String PARTITION = "kafka.EXCHANGE_NAME";
+ public static final String KEY = "kafka.CONTENT_TYPE";
public static final String TOPIC = "kafka.TOPIC";
public static final String OFFSET = "kafka.OFFSET";
public static final String KAFKA_DEFAULT_ENCODER = "kafka.serializer.DefaultEncoder";
public static final String KAFKA_STRING_ENCODER = "kafka.serializer.StringEncoder";
+ public static final String KAFKA_DEFAULT_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+ public static final String KAFKA_DEFAULT_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+ public static final String KAFKA_DEFAULT_PARTITIONER = "org.apache.kafka.clients.producer.internals.DefaultPartitioner";
+ public static final String PARTITIONER_RANGE_ASSIGNOR = "org.apache.kafka.clients.consumer.RangeAssignor";
private KafkaConstants() {
// Utility class