You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/12/02 04:40:27 UTC

[10/11] camel git commit: CAMEL-8085 Fixed the CS errors of camel-kafka

CAMEL-8085 Fixed the CS errors of camel-kafka


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/37f0b229
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/37f0b229
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/37f0b229

Branch: refs/heads/master
Commit: 37f0b2296c1b52e05999df9524d00c49df69d819
Parents: f6c5eee
Author: Willem Jiang <wi...@gmail.com>
Authored: Tue Dec 2 10:49:22 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Tue Dec 2 10:53:37 2014 +0800

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConsumer.java     | 18 +++++++++++-------
 .../component/kafka/BaseEmbeddedKafkaTest.java   | 14 +++++++-------
 .../kafka/KafkaConsumerBatchSizeTest.java        | 12 ++++++------
 .../component/kafka/KafkaConsumerFullTest.java   |  4 ++--
 .../component/kafka/KafkaProducerFullTest.java   |  4 ++--
 .../kafka/embedded/EmbeddedKafkaCluster.java     | 19 +++++++++++--------
 .../kafka/embedded/EmbeddedZookeeper.java        | 10 +++++-----
 .../component/kafka/embedded/TestUtils.java      |  2 +-
 8 files changed, 45 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/37f0b229/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 1813376..0165310 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -16,6 +16,16 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 import kafka.consumer.ConsumerConfig;
 import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
@@ -26,12 +36,6 @@ import org.apache.camel.impl.DefaultConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.*;
-
 /**
  *
  */
@@ -82,7 +86,7 @@ public class KafkaConsumer extends DefaultConsumer {
                     executor.submit(new BatchingConsumerTask(stream, barrier));
                 }
                 consumerBarriers.put(consumer, barrier);
-            } else{
+            } else {
                 for (final KafkaStream<byte[], byte[]> stream : streams) {
                     executor.submit(new AutoCommitConsumerTask(stream));
                 }

http://git-wip-us.apache.org/repos/asf/camel/blob/37f0b229/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
index b181610..9b5b002 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
@@ -16,24 +16,24 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
 import org.apache.camel.component.kafka.embedded.EmbeddedKafkaCluster;
 import org.apache.camel.component.kafka.embedded.EmbeddedZookeeper;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
 public class BaseEmbeddedKafkaTest extends CamelTestSupport {
 
     static EmbeddedZookeeper embeddedZookeeper;
     static EmbeddedKafkaCluster embeddedKafkaCluster;
 
     @BeforeClass
-    public static void beforeClass(){
+    public static void beforeClass() {
         embeddedZookeeper = new EmbeddedZookeeper(2181);
         List<Integer> kafkaPorts = new ArrayList<Integer>();
         // -1 for any available port
@@ -50,7 +50,7 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport {
     }
 
     @AfterClass
-    public static void afterClass(){
+    public static void afterClass() {
         embeddedKafkaCluster.shutdown();
         embeddedZookeeper.shutdown();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/37f0b229/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
index 2b40724..acac628 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.util.Properties;
+
 import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
@@ -27,15 +29,13 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Properties;
-
 public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest {
 
     public static final String TOPIC = "test";
 
-    @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + "&zookeeperHost=localhost&zookeeperPort=2181&" +
-            "groupId=group1&autoOffsetReset=smallest&" +
-            "autoCommitEnable=false&batchSize=3&consumerStreams=1")
+    @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + "&zookeeperHost=localhost&zookeeperPort=2181&"
+        + "groupId=group1&autoOffsetReset=smallest&"
+        + "autoCommitEnable=false&batchSize=3&consumerStreams=1")
     private Endpoint from;
 
     @EndpointInject(uri = "mock:result")
@@ -91,7 +91,7 @@ public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest {
         to.expectedBodiesReceivedInAnyOrder("m1", "m2", "m3", "m4", "m5", "m6", "m7", "m8", "m9", "m10");
 
         //Second route must wake up and consume all from scratch and commit 9 consumed
-        for (int k = 3; k <=10; k++) {
+        for (int k = 3; k <= 10; k++) {
             String msg = "m" + k;
             KeyedMessage<String, String> data = new KeyedMessage<String, String>(TOPIC, "1", msg);
             producer.send(data);

http://git-wip-us.apache.org/repos/asf/camel/blob/37f0b229/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
index 922ffa6..8cd17e9 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
@@ -35,8 +35,8 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest {
 
     public static final String TOPIC = "test";
 
-    @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + "&zookeeperHost=localhost&zookeeperPort=2181" +
-            "&groupId=group1&autoOffsetReset=smallest")
+    @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + "&zookeeperHost=localhost&zookeeperPort=2181"
+        + "&groupId=group1&autoOffsetReset=smallest")
     private Endpoint from;
 
     @EndpointInject(uri = "mock:result")

http://git-wip-us.apache.org/repos/asf/camel/blob/37f0b229/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index b86e402..95a4d8d 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -51,8 +51,8 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerFullTest.class);
 
     @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC 
-        + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner&serializerClass=kafka.serializer.StringEncoder" +
-            "&requestRequiredAcks=-1")
+        + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner&serializerClass=kafka.serializer.StringEncoder"
+        + "&requestRequiredAcks=-1")
     private Endpoint to;
 
     @Produce(uri = "direct:start")

http://git-wip-us.apache.org/repos/asf/camel/blob/37f0b229/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
index bf32064..f23dd46 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
@@ -16,15 +16,18 @@
  */
 package org.apache.camel.component.kafka.embedded;
 
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
 import kafka.admin.AdminUtils;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import org.I0Itec.zkclient.ZkClient;
 
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.util.*;
-
 public class EmbeddedKafkaCluster {
     private final List<Integer> ports;
     private final String zkConnection;
@@ -53,15 +56,15 @@ public class EmbeddedKafkaCluster {
         this.brokerList = constructBrokerList(this.ports);
     }
 
-    public ZkClient getZkClient(){
-        for(KafkaServer server : brokers){
+    public ZkClient getZkClient() {
+        for (KafkaServer server : brokers) {
             return server.zkClient();
         }
         return null;
     }
 
-    public void createTopics(String...topics){
-        for(String topic : topics){
+    public void createTopics(String...topics) {
+        for (String topic : topics) {
             AdminUtils.createTopic(getZkClient(), topic, 2, 1, new Properties());
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/37f0b229/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java
index ef8d55e..5441668 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java
@@ -16,15 +16,15 @@
  */
 package org.apache.camel.component.kafka.embedded;
 
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
-
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
 public class EmbeddedZookeeper {
     private int port = -1;
     private int tickTime = 500;
@@ -53,7 +53,7 @@ public class EmbeddedZookeeper {
         return port;
     }
 
-    public void startup() throws IOException{
+    public void startup() throws IOException {
         if (this.port == -1) {
             this.port = TestUtils.getAvailablePort();
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/37f0b229/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/TestUtils.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/TestUtils.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/TestUtils.java
index 26574e5..7a71b14 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/TestUtils.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/TestUtils.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import java.net.ServerSocket;
 import java.util.Random;
 
-class TestUtils {
+final class TestUtils {
     private static final Random RANDOM = new Random();
 
     private TestUtils() {