You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/01/29 04:45:09 UTC

[pulsar] branch master updated: Some minor fixes in Kafka-IO and Client-Cli. (#3418)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 252ba22  Some minor fixes in Kafka-IO and Client-Cli. (#3418)
252ba22 is described below

commit 252ba221cc3fe17ec18be8b3f080ed0da0844d50
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Tue Jan 29 12:45:04 2019 +0800

    Some minor fixes in Kafka-IO and Client-Cli. (#3418)
---
 .../org/apache/pulsar/client/cli/CmdProduce.java   |  2 +-
 .../apache/pulsar/io/kafka/KafkaSinkConfig.java    |  4 +--
 site2/docs/io-kafka.md                             | 27 +++++++++----------
 site2/docs/reference-cli-tools.md                  | 30 ++++++++++++++--------
 4 files changed, 36 insertions(+), 27 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
index c559494..eb8fe96 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
@@ -49,7 +49,7 @@ public class CmdProduce {
     @Parameter(description = "TopicName", required = true)
     private List<String> mainOptions;
 
-    @Parameter(names = { "-m", "--messages" }, description = "Comma separted string messages to send, "
+    @Parameter(names = { "-m", "--messages" }, description = "Comma separated string messages to send, "
             + "either -m or -f must be specified.")
     private List<String> messages = Lists.newArrayList();
 
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
index e6541ac..62370ed 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java
@@ -55,12 +55,12 @@ public class KafkaSinkConfig implements Serializable {
           + "before considering a request complete. This controls the durability of records that are sent.")
     private String acks;
     @FieldDoc(
-        defaultValue = "16384",
+        defaultValue = "16384L",
         help =
             "The batch size that Kafka producer will attempt to batch records together before sending them to brokers.")
     private long batchSize = 16384L;
     @FieldDoc(
-        defaultValue = "16384",
+        defaultValue = "1048576L",
         help =
             "The maximum size of a Kafka request in bytes.")
     private long maxRequestSize = 1048576L;
diff --git a/site2/docs/io-kafka.md b/site2/docs/io-kafka.md
index 5eab2c1..a0ea59c 100644
--- a/site2/docs/io-kafka.md
+++ b/site2/docs/io-kafka.md
@@ -15,14 +15,14 @@ to a Pulsar topic.
 |------|----------|---------|-------------|
 | bootstrapServers | `true` | `null` | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. |
 | groupId | `true` | `null` | A unique string that identifies the consumer group this consumer belongs to. |
-| fetchMinBytes | `false` | `null` | Minimum bytes expected for each fetch response. |
-| autoCommitEnabled | `false` | `false` | If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer. This committed offset will be used when the process fails as the position from which the new consumer will begin. | 
-| autoCommitIntervalMs | `false` | `null` | The frequency in ms that the consumer offsets are committed to zookeeper. |
+| fetchMinBytes | `false` | `1` | Minimum bytes expected for each fetch response. |
+| autoCommitEnabled | `false` | `true` | If true, the consumer's offset will be periodically committed in the background. This committed offset will be used when the process fails as the position from which the new consumer will begin. |
+| autoCommitIntervalMs | `false` | `5000` | The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if `autoCommitEnabled` is set to true. |
 | heartbeatIntervalMs | `false` | `3000` | The interval between heartbeats to the consumer when using Kafka's group management facilities. |
-| sessionTimeoutMs | `false` | `null` | The timeout used to detect consumer failures when using Kafka's group management facility. |
-| topic | `true` | `null` | Topic name to receive records from Kafka |
-| keySerializerClass | false | org.apache.kafka.common.serialization.StringSerializer | Serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface. |
-| valueSerializerClass | false | org.apache.kafka.common.serialization.StringSerializer | Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. |
+| sessionTimeoutMs | `false` | `30000` | The timeout used to detect consumer failures when using Kafka's group management facility. |
+| topic | `true` | `null` | Topic name to receive records from Kafka. |
+| keyDeserializationClass | `false` | `org.apache.kafka.common.serialization.StringDeserializer` | Deserializer class for key that implements the org.apache.kafka.common.serialization.Deserializer interface. |
+| valueDeserializationClass | `false` | `org.apache.kafka.common.serialization.ByteArrayDeserializer` | Deserializer class for value that implements the org.apache.kafka.common.serialization.Deserializer interface. |
 
 ## Sink
 
@@ -33,9 +33,10 @@ to a Kafka topic.
 
 | Name | Required | Default | Description |
 |------|----------|---------|-------------|
-| acks | `true` | `null` | The kafka producer acks mode |
-| batchSize | `true` | `null` | The kafka producer batch size. |
-| maxRequestSize | `true` | `null` | The maximum size of a request in bytes. |
-| topic | `true` | `null` | Topic name to receive records from Kafka |
-| keySerializerClass | false | org.apache.kafka.common.serialization.StringSerializer | Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. |
-| valueSerializerClass | false | org.apache.kafka.common.serialization.StringSerializer | Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. |
+| bootstrapServers | `true` | `null` | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. |
+| acks | `true` | `null` | The kafka producer acks mode. |
+| batchSize | `false` | `16384` | The kafka producer batch size. |
+| maxRequestSize | `false` | `1048576` | The maximum size of a request in bytes. |
+| topic | `true` | `null` | Topic name to receive records from Kafka. |
+| keySerializerClass | `false` | `org.apache.kafka.common.serialization.StringSerializer` | Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. |
+| valueSerializerClass | `false` | `org.apache.kafka.common.serialization.ByteArraySerializer` | Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. |
diff --git a/site2/docs/reference-cli-tools.md b/site2/docs/reference-cli-tools.md
index fa3a45a..96b0bc1 100644
--- a/site2/docs/reference-cli-tools.md
+++ b/site2/docs/reference-cli-tools.md
@@ -323,7 +323,7 @@ Options
 |---|---|---|
 |`--hex`|Display binary messages in hexadecimal format.|false|
 |`-n`, `--num-messages`|Number of messages to consume, 0 means to consume forever.|0|
-|`-r`, `--rate`|Rate (in messages per second) at which to produce; a value 0 means to produce messages as fast as possible|0.0|
+|`-r`, `--rate`|Rate (in messages per second) at which to consume; a value 0 means to consume messages as fast as possible|0.0|
 |`-s`, `--subscription-name`|Subscription name||
 |`-t`, `--subscription-type`|The type of the subscription. Possible values: Exclusive, Shared, Failover.|Exclusive|
 
@@ -383,6 +383,7 @@ Commands
 * `simulation-controller`
 
 Environment variables
+
 The table below lists the environment variables that you can use to configure the pulsar-perf tool.
 
 |Variable|Description|Default|
@@ -407,18 +408,21 @@ Options
 |`--auth_params`|Authentication parameters in the form of key1:val1,key2:val2||
 |`--auth_plugin`|Authentication plugin class name||
 |`-b`, `--batch-time-window`|Batch messages in a window of the specified number of milliseconds|1|
+|`--acks-delay-millis`|Acknowlegments grouping delay in millis|100|
+|`-k`, `--encryption-key-name`|The private key name to decrypt payload||
+|`-v`, `--encryption-key-value-file`|The file which contains the private key to decrypt payload||
 |`--conf-file`|Configuration file||
-|`-c`, `--max-connections`|Max number of TCP connections to a single broker|0|
-|`-o`, `--max-outstanding`|Max number of outstanding messages|1000|
-|`-m`, `--num-messages`|Number of messages to publish in total. If set to 0, it will keep publishing.|0|
-|`-n`, `--num-producers`|The number of producers (per topic)|1|
+|`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
+|`-n`, `--num-consumers`|Number of consumers (per topic)|1|
 |`-t`, `--num-topic`|The number of topics|1|
-|`-f`, `--payload-file`|Use payload from a file instead of an empty buffer||
-|`-r`, `--rate`|Publish rate msg/s across topics|100|
+|`-r`, `--rate`|Simulate a slow message consumer (rate in msg/s)|0|
+|`-q`, `--receiver-queue-size`|Size of the receiver queue|1000|
 |`-u`, `--service-url`|Pulsar service URL||
-|`-s`, `--size`|Message size (in bytes)|1024|
-|`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled.|0|
-|`-time`, `--test-duration`|Test duration in secs. If set to 0, it will keep publishing.|0|
+|`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled|0|
+|`-s`, `--subscriber-name`|Subscriber name prefix|sub|
+|`-st`, `--subscription-type`|Subscriber name prefix. Possible values are Exclusive, Shared, Failover.|Exclusive|
+|`--trust-cert-file`|Path for the trusted TLS certificate file||
+|`--use-tls`|Use TLS encryption on the connection|false|
 
 
 ### `produce`
@@ -437,7 +441,7 @@ Options
 |`-b`, `--batch-time-window`|Batch messages in a window of the specified number of milliseconds|1|
 |`-z`, `--compression`|Compress messages’ payload. Possible values are NONE, LZ4, ZLIB or ZSTD.||
 |`--conf-file`|Configuration file||
-|`-c`, `--max-connections`|Max number of TCP connections to a single broker|0|
+|`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
 |`-o`, `--max-outstanding`|Max number of outstanding messages|1000|
 |`-m`, `--num-messages`|Number of messages to publish in total. If set to 0, it will keep publishing.|0|
 |`-n`, `--num-producers`|The number of producers (per topic)|1|
@@ -448,6 +452,9 @@ Options
 |`-s`, `--size`|Message size (in bytes)|1024|
 |`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled.|0|
 |`-time`, `--test-duration`|Test duration in secs. If set to 0, it will keep publishing.|0|
+|`--trust-cert-file`|Path for the trusted TLS certificate file||
+|`--use-tls`|Use TLS encryption on the connection|false|
+|`--warmup-time`|Warm-up time in seconds |1|
 
 
 
@@ -507,6 +514,7 @@ Commands
 
 
 Environment variables
+
 The table below lists the environment variables that you can use to configure the bookkeeper tool.
 
 |Variable|Description|Default|