You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/04/08 15:08:12 UTC

camel git commit: CAMEL-9818: Camel kafka consumer adds legacy (deprecated properties)

Repository: camel
Updated Branches:
  refs/heads/master fd659c108 -> d84cc7005


CAMEL-9818: Camel kafka consumer adds legacy (deprecated properties)


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d84cc700
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d84cc700
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d84cc700

Branch: refs/heads/master
Commit: d84cc70056160fdf98b286c52a9d558663d8e8e1
Parents: fd659c1
Author: Andrea Cosentino <an...@gmail.com>
Authored: Fri Apr 8 15:03:50 2016 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Fri Apr 8 15:05:11 2016 +0200

----------------------------------------------------------------------
 components/camel-kafka/src/main/docs/kafka.adoc |  9 ++--
 .../component/kafka/KafkaConfiguration.java     | 54 +-------------------
 .../camel/component/kafka/KafkaEndpoint.java    | 24 ---------
 .../component/kafka/KafkaComponentTest.java     |  6 ---
 4 files changed, 5 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d84cc700/components/camel-kafka/src/main/docs/kafka.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka.adoc b/components/camel-kafka/src/main/docs/kafka.adoc
index 7528eb9..0f23849 100644
--- a/components/camel-kafka/src/main/docs/kafka.adoc
+++ b/components/camel-kafka/src/main/docs/kafka.adoc
@@ -65,8 +65,9 @@ The Kafka component has no options.
 
 
 
+
 // endpoint options: START
-The Kafka component supports 74 endpoint options which are listed below:
+The Kafka component supports 71 endpoint options which are listed below:
 
 [width="100%",cols="2s,1,1m,1m,5",options="header"]
 |=======================================================================
@@ -76,7 +77,7 @@ The Kafka component supports 74 endpoint options which are listed below:
 | clientId | common |  | String | The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.
 | groupId | common |  | String | A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group.
 | partitioner | common | org.apache.kafka.clients.producer.internals.DefaultPartitioner | String | The partitioner class for partitioning messages amongst sub-topics. The default partitioner is based on the hash of the key.
-| topic | common |  | String | *Required* Name of the topic to use. When used on a consumer endpoint the topic can be a comma separated list of topics.
+| topic | common |  | String | *Required* Name of the topic to use.
 | autoCommitEnable | consumer | true | Boolean | If true periodically commit to ZooKeeper the offset of messages already fetched by the consumer. This committed offset will be used when the process fails as the position from which the new consumer will begin.
 | autoCommitIntervalMs | consumer | 5000 | Integer | The frequency in ms that the consumer offsets are committed to zookeeper.
 | autoOffsetReset | consumer | latest | String | What to do when there is no initial offset in ZooKeeper or if an offset is out of range: smallest : automatically reset the offset to the smallest offset largest : automatically reset the offset to the largest offset fail: throw exception to the consumer
@@ -98,7 +99,6 @@ The Kafka component supports 74 endpoint options which are listed below:
 | sessionTimeoutMs | consumer | 30000 | Integer | The timeout used to detect failures when using Kafka's group management facilities.
 | valueDeserializer | consumer | org.apache.kafka.common.serialization.StringDeserializer | String | Deserializer class for value that implements the Deserializer interface.
 | exceptionHandler | consumer (advanced) |  | ExceptionHandler | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN/ERROR level and ignored.
-| blockOnBufferFull | producer | false | Boolean | When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default this setting is true and we block however in some scenarios blocking is not desirable and it is better to immediately give an error. Setting this to false will accomplish that: the producer will throw a BufferExhaustedException if a recrord is sent and the buffer space is full.
 | bufferMemorySize | producer | 33554432 | Integer | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will either block or throw an exception based on the preference specified by block.on.buffer.full.This setting should correspond roughly to the total memory the producer will use but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests.
 | compressionCodec | producer | none | String | This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are none gzip and snappy.
 | connectionMaxIdleMs | producer | 540000 | Integer | Close idle connections after the number of milliseconds specified by this config.
@@ -111,7 +111,6 @@ The Kafka component supports 74 endpoint options which are listed below:
 | maxBlockMs | producer | 60000 | Integer | The configuration controls how long sending to kafka will block. These methods can be blocked for multiple reasons. For e.g: buffer full metadata unavailable.This configuration imposes maximum limit on the total time spent in fetching metadata serialization of key and value partitioning and allocation of buffer memory when doing a send(). In case of partitionsFor() this configuration imposes a maximum time threshold on waiting for metadata
 | maxInFlightRequest | producer | 5 | Integer | The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends there is a risk of message re-ordering due to retries (i.e. if retries are enabled).
 | maxRequestSize | producer | 1048576 | Integer | The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server has its own cap on record size which may be different from this. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests.
-| metadataFetchTimeoutMs | producer | 60000 | Integer | The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the topic's partitions. This fetch to succeed before throwing an exception back to the client.
 | metadataMaxAgeMs | producer | 300000 | Integer | The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.
 | metricReporters | producer |  | String | A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.
 | metricsSampleWindowMs | producer | 30000 | Integer | The number of samples maintained to compute metrics.
@@ -142,13 +141,13 @@ The Kafka component supports 74 endpoint options which are listed below:
 | sslTruststoreLocation | producer |  | String | The location of the trust store file.
 | sslTruststorePassword | producer |  | String | The password for the trust store file.
 | sslTruststoreType | producer | JKS | String | The file format of the trust store file. Default value is JKS.
-| timeoutMs | producer | 30000 | Integer | The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to meet the acknowledgment requirements the producer has specified with the acks configuration. If the requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout is measured on the server side and does not include the network latency of the request.
 | exchangePattern | advanced | InOnly | ExchangePattern | Sets the default exchange pattern when creating an exchange
 | synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported).
 |=======================================================================
 // endpoint options: END
 
 
+
 For more information about Producer/Consumer configuration:
 
 http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs]

http://git-wip-us.apache.org/repos/asf/camel/blob/d84cc700/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 49d7fe2..e0580e9 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -184,18 +184,9 @@ public class KafkaConfiguration {
     //ssl.truststore.type
     @UriParam(label = "producer", defaultValue = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE)
     private String sslTruststoreType = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE;
-    //timeout.ms
-    @UriParam(label = "producer", defaultValue = "30000")
-    private Integer timeoutMs = 30000;
-    //block.on.buffer.full
-    @UriParam(label = "producer", defaultValue = "false")
-    private Boolean blockOnBufferFull = false;
     //max.in.flight.requests.per.connection
     @UriParam(label = "producer", defaultValue = "5")
     private Integer maxInFlightRequest = 5;
-    //metadata.fetch.timeout.ms
-    @UriParam(label = "producer", defaultValue = "60000")
-    private Integer metadataFetchTimeoutMs = 600 * 1000;
     //metadata.max.age.ms
     @UriParam(label = "producer", defaultValue = "300000")
     private Integer metadataMaxAgeMs = 300000;
@@ -276,10 +267,7 @@ public class KafkaConfiguration {
         addPropertyIfNotNull(props, SslConfigs.SSL_PROTOCOL_CONFIG, getSslProtocol());
         addPropertyIfNotNull(props, SslConfigs.SSL_PROVIDER_CONFIG, getSslProvider());
         addPropertyIfNotNull(props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, getSslTruststoreType());
-        addPropertyIfNotNull(props, ProducerConfig.TIMEOUT_CONFIG, getTimeoutMs());
-        addPropertyIfNotNull(props, ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, getBlockOnBufferFull());
         addPropertyIfNotNull(props, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, getMaxInFlightRequest());
-        addPropertyIfNotNull(props, ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, getMetadataFetchTimeoutMs());
         addPropertyIfNotNull(props, ProducerConfig.METADATA_MAX_AGE_CONFIG, getMetadataMaxAgeMs());
         addPropertyIfNotNull(props, ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, getMetricReporters());
         addPropertyIfNotNull(props, ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, getNoOfMetricsSample());
@@ -389,7 +377,7 @@ public class KafkaConfiguration {
     }
 
     /**
-     * Name of the topic to use. When used on a consumer endpoint the topic can be a comma separated list of topics.
+     * Name of the topic to use.
      */
     public void setTopic(String topic) {
         this.topic = topic;
@@ -870,20 +858,6 @@ public class KafkaConfiguration {
         this.bufferMemorySize = bufferMemorySize;
     }
 
-    public Boolean getBlockOnBufferFull() {
-        return blockOnBufferFull;
-    }
-
-    /**
-     * When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors.
-     * By default this setting is true and we block, however in some scenarios blocking is not desirable and it
-     * is better to immediately give an error. Setting this to false will accomplish that: the producer will throw
-     * a BufferExhaustedException if a recrord is sent and the buffer space is full.
-     */
-    public void setBlockOnBufferFull(Boolean blockOnBufferFull) {
-        this.blockOnBufferFull = blockOnBufferFull;
-    }
-
     public Integer getRequestRequiredAcks() {
         return requestRequiredAcks;
     }
@@ -1003,20 +977,6 @@ public class KafkaConfiguration {
         this.receiveBufferBytes = receiveBufferBytes;
     }
 
-    public Integer getTimeoutMs() {
-        return timeoutMs;
-    }
-
-    /**
-     * The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to meet the
-     * acknowledgment requirements the producer has specified with the acks configuration. If the requested number of acknowledgments
-     * are not met when the timeout elapses an error will be returned. This timeout is measured on the server side and does not include
-     * the network latency of the request.
-     */
-    public void setTimeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-    }
-
     public Integer getMaxInFlightRequest() {
         return maxInFlightRequest;
     }
@@ -1029,18 +989,6 @@ public class KafkaConfiguration {
         this.maxInFlightRequest = maxInFlightRequest;
     }
 
-    public Integer getMetadataFetchTimeoutMs() {
-        return metadataFetchTimeoutMs;
-    }
-
-    /**
-     * The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the topic's partitions.
-     * This fetch to succeed before throwing an exception back to the client.
-     */
-    public void setMetadataFetchTimeoutMs(Integer metadataFetchTimeoutMs) {
-        this.metadataFetchTimeoutMs = metadataFetchTimeoutMs;
-    }
-
     public Integer getMetadataMaxAgeMs() {
         return metadataMaxAgeMs;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/d84cc700/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 327ecdc..1c239c8 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -283,10 +283,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         configuration.setBrokers(brokers);
     }
 
-    public void setMetadataFetchTimeoutMs(Integer metadataFetchTimeoutMs) {
-        configuration.setMetadataFetchTimeoutMs(metadataFetchTimeoutMs);
-    }
-
     public String getValueDeserializer() {
         return configuration.getValueDeserializer();
     }
@@ -375,10 +371,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         configuration.setSslKeystorePassword(sslKeystorePassword);
     }
 
-    public Boolean getBlockOnBufferFull() {
-        return configuration.getBlockOnBufferFull();
-    }
-
     public void setCheckCrcs(Boolean checkCrcs) {
         configuration.setCheckCrcs(checkCrcs);
     }
@@ -415,10 +407,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         configuration.setSslKeyPassword(sslKeyPassword);
     }
 
-    public void setBlockOnBufferFull(Boolean blockOnBufferFull) {
-        configuration.setBlockOnBufferFull(blockOnBufferFull);
-    }
-
     public Integer getRequestRequiredAcks() {
         return configuration.getRequestRequiredAcks();
     }
@@ -495,10 +483,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         return configuration.getSslTruststorePassword();
     }
 
-    public void setTimeoutMs(Integer timeoutMs) {
-        configuration.setTimeoutMs(timeoutMs);
-    }
-
     public void setConsumerStreams(int consumerStreams) {
         configuration.setConsumerStreams(consumerStreams);
     }
@@ -551,10 +535,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         return configuration.getPartitionAssignor();
     }
 
-    public Integer getMetadataFetchTimeoutMs() {
-        return configuration.getMetadataFetchTimeoutMs();
-    }
-
     public void setSecurityProtocol(String securityProtocol) {
         configuration.setSecurityProtocol(securityProtocol);
     }
@@ -655,10 +635,6 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         return configuration.getSendBufferBytes();
     }
 
-    public Integer getTimeoutMs() {
-        return configuration.getTimeoutMs();
-    }
-
     public String getSslProtocol() {
         return configuration.getSslProtocol();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/d84cc700/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index 31c2dd6..1c2c564 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -64,21 +64,18 @@ public class KafkaComponentTest {
         assertEquals(new Integer(10), endpoint.getProducerBatchSize());
         assertEquals(new Integer(12), endpoint.getConnectionMaxIdleMs());
         assertEquals(new Integer(1), endpoint.getMaxBlockMs());
-        assertEquals(false, endpoint.getBlockOnBufferFull());
         assertEquals(new Integer(1), endpoint.getBufferMemorySize());
         assertEquals("testing", endpoint.getClientId());
         assertEquals("none", endpoint.getCompressionCodec());
         assertEquals(new Integer(1), endpoint.getLingerMs());
         assertEquals(new Integer(100), endpoint.getMaxRequestSize());
         assertEquals(100, endpoint.getRequestTimeoutMs().intValue());
-        assertEquals(new Integer(9043), endpoint.getMetadataFetchTimeoutMs());
         assertEquals(new Integer(1029), endpoint.getMetadataMaxAgeMs());
         assertEquals(new Integer(23), endpoint.getReceiveBufferBytes());
         assertEquals(new Integer(234), endpoint.getReconnectBackoffMs());
         assertEquals(new Integer(0), endpoint.getRetries());
         assertEquals(3782, endpoint.getRetryBackoffMs().intValue());
         assertEquals(765, endpoint.getSendBufferBytes().intValue());
-        assertEquals(new Integer(2045), endpoint.getTimeoutMs());
         assertEquals(new Integer(1), endpoint.getMaxInFlightRequest());
         assertEquals("org.apache.camel.reporters.TestReport,org.apache.camel.reporters.SampleReport", endpoint.getMetricReporters());
         assertEquals(new Integer(3), endpoint.getNoOfMetricsSample());
@@ -134,10 +131,7 @@ public class KafkaComponentTest {
         props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, "32768");
         props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
         props.put(ProducerConfig.SEND_BUFFER_CONFIG, "131072");
-        props.put(ProducerConfig.TIMEOUT_CONFIG, "30000");
-        props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false");
         props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
-        props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, "60000");
         props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "300000");
         props.put(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, "2");
         props.put(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, "30000");