You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/06/17 00:38:03 UTC

[1/2] kafka git commit: kafka-2264; SESSION_TIMEOUT_MS_CONFIG in ConsumerConfig should be int; patched by Manikumar Reddy; reviewed by Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 28ecea421 -> 478505632


kafka-2264; SESSION_TIMEOUT_MS_CONFIG in ConsumerConfig should be int; patched by Manikumar Reddy; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7009f1d6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7009f1d6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7009f1d6

Branch: refs/heads/trunk
Commit: 7009f1d6fffe3866723d1d33a28a4572053eb4e5
Parents: 28ecea4
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Tue Jun 16 15:30:52 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jun 16 15:30:52 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/clients/consumer/ConsumerConfig.java | 2 +-
 .../java/org/apache/kafka/clients/consumer/KafkaConsumer.java  | 2 +-
 .../apache/kafka/clients/consumer/internals/Coordinator.java   | 6 +++---
 .../kafka/clients/consumer/internals/CoordinatorTest.java      | 2 +-
 4 files changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7009f1d6/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 1e90524..daff34d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -167,7 +167,7 @@ public class ConsumerConfig extends AbstractConfig {
                                         CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
                                 .define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC)
                                 .define(SESSION_TIMEOUT_MS_CONFIG,
-                                        Type.LONG,
+                                        Type.INT,
                                         30000,
                                         Importance.HIGH,
                                         SESSION_TIMEOUT_MS_DOC)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7009f1d6/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index d1d1ec1..951c34c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -484,7 +484,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             this.coordinator = new Coordinator(this.client,
                     config.getString(ConsumerConfig.GROUP_ID_CONFIG),
                     this.retryBackoffMs,
-                    config.getLong(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
+                    config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
                     config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
                     this.metadata,
                     this.subscriptions,

http://git-wip-us.apache.org/repos/asf/kafka/blob/7009f1d6/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index c1496a0..41cb945 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -69,7 +69,7 @@ public final class Coordinator {
     private final String groupId;
     private final Metadata metadata;
     private final Heartbeat heartbeat;
-    private final long sessionTimeoutMs;
+    private final int sessionTimeoutMs;
     private final String assignmentStrategy;
     private final SubscriptionState subscriptions;
     private final CoordinatorMetrics sensors;
@@ -84,7 +84,7 @@ public final class Coordinator {
     public Coordinator(KafkaClient client,
                        String groupId,
                        long retryBackoffMs,
-                       long sessionTimeoutMs,
+                       int sessionTimeoutMs,
                        String assignmentStrategy,
                        Metadata metadata,
                        SubscriptionState subscriptions,
@@ -123,7 +123,7 @@ public final class Coordinator {
         // repeat processing the response until succeed or fatal error
         do {
             JoinGroupRequest request = new JoinGroupRequest(groupId,
-                (int) this.sessionTimeoutMs,
+                this.sessionTimeoutMs,
                 subscribedTopics,
                 this.consumerId,
                 this.assignmentStrategy);

http://git-wip-us.apache.org/repos/asf/kafka/blob/7009f1d6/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
index b06c4a7..1454ab7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
@@ -50,7 +50,7 @@ public class CoordinatorTest {
     private String groupId = "test-group";
     private TopicPartition tp = new TopicPartition(topicName, 0);
     private long retryBackoffMs = 0L;
-    private long sessionTimeoutMs = 10L;
+    private int sessionTimeoutMs = 10;
     private String rebalanceStrategy = "not-matter";
     private MockTime time = new MockTime();
     private MockClient client = new MockClient(time);


[2/2] kafka git commit: kafka-2252; Socket connection closing is logged, but not corresponding opening of socket; patched by Gwen Shapira; reviewed by Jun Rao

Posted by ju...@apache.org.
kafka-2252; Socket connection closing is logged, but not corresponding opening of socket; patched by Gwen Shapira; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/47850563
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/47850563
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/47850563

Branch: refs/heads/trunk
Commit: 478505632edc8f4b51e4ed561d1adf455256c3e4
Parents: 7009f1d
Author: Gwen Shapira <cs...@gmail.com>
Authored: Tue Jun 16 15:37:58 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jun 16 15:37:58 2015 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/kafka/common/network/Selector.java   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/47850563/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 1da215b..4aee214 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -267,6 +267,7 @@ public class Selector implements Selectable {
                         key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
                         this.connected.add(transmissions.id);
                         this.sensors.connectionCreated.record();
+                        log.debug("Connection {} created", transmissions.id);
                     }
 
                     /* read from any connections that have readable data */
@@ -307,7 +308,7 @@ public class Selector implements Selectable {
                 } catch (IOException e) {
                     String desc = socketDescription(channel);
                     if (e instanceof EOFException || e instanceof ConnectException)
-                        log.info("Connection {} disconnected", desc);
+                        log.debug("Connection {} disconnected", desc);
                     else
                         log.warn("Error in I/O with connection to {}", desc, e);
                     close(transmissions.id);