You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/06/17 00:38:03 UTC
[1/2] kafka git commit: kafka-2264;
SESSION_TIMEOUT_MS_CONFIG in ConsumerConfig should be int;
patched by Manikumar Reddy; reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 28ecea421 -> 478505632
kafka-2264; SESSION_TIMEOUT_MS_CONFIG in ConsumerConfig should be int; patched by Manikumar Reddy; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7009f1d6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7009f1d6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7009f1d6
Branch: refs/heads/trunk
Commit: 7009f1d6fffe3866723d1d33a28a4572053eb4e5
Parents: 28ecea4
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Tue Jun 16 15:30:52 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jun 16 15:30:52 2015 -0700
----------------------------------------------------------------------
.../java/org/apache/kafka/clients/consumer/ConsumerConfig.java | 2 +-
.../java/org/apache/kafka/clients/consumer/KafkaConsumer.java | 2 +-
.../apache/kafka/clients/consumer/internals/Coordinator.java | 6 +++---
.../kafka/clients/consumer/internals/CoordinatorTest.java | 2 +-
4 files changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7009f1d6/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 1e90524..daff34d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -167,7 +167,7 @@ public class ConsumerConfig extends AbstractConfig {
CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
.define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC)
.define(SESSION_TIMEOUT_MS_CONFIG,
- Type.LONG,
+ Type.INT,
30000,
Importance.HIGH,
SESSION_TIMEOUT_MS_DOC)
http://git-wip-us.apache.org/repos/asf/kafka/blob/7009f1d6/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index d1d1ec1..951c34c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -484,7 +484,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.coordinator = new Coordinator(this.client,
config.getString(ConsumerConfig.GROUP_ID_CONFIG),
this.retryBackoffMs,
- config.getLong(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
+ config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
this.metadata,
this.subscriptions,
http://git-wip-us.apache.org/repos/asf/kafka/blob/7009f1d6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index c1496a0..41cb945 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -69,7 +69,7 @@ public final class Coordinator {
private final String groupId;
private final Metadata metadata;
private final Heartbeat heartbeat;
- private final long sessionTimeoutMs;
+ private final int sessionTimeoutMs;
private final String assignmentStrategy;
private final SubscriptionState subscriptions;
private final CoordinatorMetrics sensors;
@@ -84,7 +84,7 @@ public final class Coordinator {
public Coordinator(KafkaClient client,
String groupId,
long retryBackoffMs,
- long sessionTimeoutMs,
+ int sessionTimeoutMs,
String assignmentStrategy,
Metadata metadata,
SubscriptionState subscriptions,
@@ -123,7 +123,7 @@ public final class Coordinator {
// repeat processing the response until succeed or fatal error
do {
JoinGroupRequest request = new JoinGroupRequest(groupId,
- (int) this.sessionTimeoutMs,
+ this.sessionTimeoutMs,
subscribedTopics,
this.consumerId,
this.assignmentStrategy);
http://git-wip-us.apache.org/repos/asf/kafka/blob/7009f1d6/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
index b06c4a7..1454ab7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
@@ -50,7 +50,7 @@ public class CoordinatorTest {
private String groupId = "test-group";
private TopicPartition tp = new TopicPartition(topicName, 0);
private long retryBackoffMs = 0L;
- private long sessionTimeoutMs = 10L;
+ private int sessionTimeoutMs = 10;
private String rebalanceStrategy = "not-matter";
private MockTime time = new MockTime();
private MockClient client = new MockClient(time);
[2/2] kafka git commit: kafka-2252;
Socket connection closing is logged, but not corresponding opening of
socket; patched by Gwen Shapira; reviewed by Jun Rao
Posted by ju...@apache.org.
kafka-2252; Socket connection closing is logged, but not corresponding opening of socket; patched by Gwen Shapira; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/47850563
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/47850563
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/47850563
Branch: refs/heads/trunk
Commit: 478505632edc8f4b51e4ed561d1adf455256c3e4
Parents: 7009f1d
Author: Gwen Shapira <cs...@gmail.com>
Authored: Tue Jun 16 15:37:58 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jun 16 15:37:58 2015 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/kafka/common/network/Selector.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/47850563/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 1da215b..4aee214 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -267,6 +267,7 @@ public class Selector implements Selectable {
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
this.connected.add(transmissions.id);
this.sensors.connectionCreated.record();
+ log.debug("Connection {} created", transmissions.id);
}
/* read from any connections that have readable data */
@@ -307,7 +308,7 @@ public class Selector implements Selectable {
} catch (IOException e) {
String desc = socketDescription(channel);
if (e instanceof EOFException || e instanceof ConnectException)
- log.info("Connection {} disconnected", desc);
+ log.debug("Connection {} disconnected", desc);
else
log.warn("Error in I/O with connection to {}", desc, e);
close(transmissions.id);