You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/04/20 09:32:00 UTC
[1/3] storm git commit: STORM-1680,
Added Kafka Spout Config FetchByte to Storm-Kafka
Repository: storm
Updated Branches:
refs/heads/master d593918a5 -> ce36b6dbf
STORM-1680, Added Kafka Spout Config FetchByte to Storm-Kafka
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8b0af500
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8b0af500
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8b0af500
Branch: refs/heads/master
Commit: 8b0af5004d8472117d709caa8af997c6842d982a
Parents: d593918
Author: narendra_bidari <Na...@symantec.com>
Authored: Mon Apr 4 12:28:09 2016 -0700
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Apr 20 16:31:02 2016 +0900
----------------------------------------------------------------------
.../storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java | 5 ++++-
.../storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java | 2 +-
2 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/8b0af500/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java
index e1e1d24..c845531 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java
@@ -17,10 +17,12 @@
*/
package org.apache.storm.kafka;
+import java.io.Serializable;
+
import org.apache.storm.spout.MultiScheme;
import org.apache.storm.spout.RawMultiScheme;
-import java.io.Serializable;
+import kafka.api.FetchRequest;
public class KafkaConfig implements Serializable {
private static final long serialVersionUID = 5276718734571623855L;
@@ -39,6 +41,7 @@ public class KafkaConfig implements Serializable {
public long maxOffsetBehind = Long.MAX_VALUE;
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
public int metricsTimeBucketSizeInSecs = 60;
+ public int minFetchByte = FetchRequest.DefaultMinBytes();
public KafkaConfig(BrokerHosts hosts, String topic) {
this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());
http://git-wip-us.apache.org/repos/asf/storm/blob/8b0af500/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
index 7fa4340..090b6d1 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
@@ -189,7 +189,7 @@ public class KafkaUtils {
int partitionId = partition.partition;
FetchRequestBuilder builder = new FetchRequestBuilder();
FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).
- clientId(config.clientId).maxWait(config.fetchMaxWait).build();
+ clientId(config.clientId).maxWait(config.fetchMaxWait).minBytes(config.minFetchByte).build();
FetchResponse fetchResponse;
try {
fetchResponse = consumer.fetch(fetchRequest);
[2/3] storm git commit: Merge branch 'STORM-1680'
Posted by ka...@apache.org.
Merge branch 'STORM-1680'
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c8457812
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c8457812
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c8457812
Branch: refs/heads/master
Commit: c8457812805b658d895445ce58d2583d1cdfaa59
Parents: d593918 8b0af50
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Apr 20 16:31:10 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Apr 20 16:31:10 2016 +0900
----------------------------------------------------------------------
.../storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java | 5 ++++-
.../storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java | 2 +-
2 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
[3/3] storm git commit: add STORM-1680 to CHANGELOG.md
Posted by ka...@apache.org.
add STORM-1680 to CHANGELOG.md
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ce36b6db
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ce36b6db
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ce36b6db
Branch: refs/heads/master
Commit: ce36b6dbf0372c704964e5e854f7863960447a49
Parents: c845781
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Apr 20 16:31:42 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Apr 20 16:31:42 2016 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ce36b6db/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0fb0134..b02c866 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -78,6 +78,7 @@
* STORM-1521: When using Kerberos login from keytab with multiple bolts/executors ticket is not renewed in hbase bolt.
## 1.0.1
+ * STORM-1680: Provide configuration to set min fetch size in KafkaSpout
* STORM-1649: Optimize Kryo instaces creation in trident windowing
* STORM-1696: status not sync if zk fails in backpressure
* STORM-1693: Move stats cleanup to executor shutdown