You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/12/02 04:40:27 UTC
[10/11] camel git commit: CAMEL-8085 Fixed the CS errors of
camel-kafka
CAMEL-8085 Fixed the CS errors of camel-kafka
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/37f0b229
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/37f0b229
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/37f0b229
Branch: refs/heads/master
Commit: 37f0b2296c1b52e05999df9524d00c49df69d819
Parents: f6c5eee
Author: Willem Jiang <wi...@gmail.com>
Authored: Tue Dec 2 10:49:22 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Tue Dec 2 10:53:37 2014 +0800
----------------------------------------------------------------------
.../camel/component/kafka/KafkaConsumer.java | 18 +++++++++++-------
.../component/kafka/BaseEmbeddedKafkaTest.java | 14 +++++++-------
.../kafka/KafkaConsumerBatchSizeTest.java | 12 ++++++------
.../component/kafka/KafkaConsumerFullTest.java | 4 ++--
.../component/kafka/KafkaProducerFullTest.java | 4 ++--
.../kafka/embedded/EmbeddedKafkaCluster.java | 19 +++++++++++--------
.../kafka/embedded/EmbeddedZookeeper.java | 10 +++++-----
.../component/kafka/embedded/TestUtils.java | 2 +-
8 files changed, 45 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/37f0b229/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 1813376..0165310 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -16,6 +16,16 @@
*/
package org.apache.camel.component.kafka;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
@@ -26,12 +36,6 @@ import org.apache.camel.impl.DefaultConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.*;
-
/**
*
*/
@@ -82,7 +86,7 @@ public class KafkaConsumer extends DefaultConsumer {
executor.submit(new BatchingConsumerTask(stream, barrier));
}
consumerBarriers.put(consumer, barrier);
- } else{
+ } else {
for (final KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new AutoCommitConsumerTask(stream));
}
http://git-wip-us.apache.org/repos/asf/camel/blob/37f0b229/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
index b181610..9b5b002 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
@@ -16,24 +16,24 @@
*/
package org.apache.camel.component.kafka;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
import org.apache.camel.component.kafka.embedded.EmbeddedKafkaCluster;
import org.apache.camel.component.kafka.embedded.EmbeddedZookeeper;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
public class BaseEmbeddedKafkaTest extends CamelTestSupport {
static EmbeddedZookeeper embeddedZookeeper;
static EmbeddedKafkaCluster embeddedKafkaCluster;
@BeforeClass
- public static void beforeClass(){
+ public static void beforeClass() {
embeddedZookeeper = new EmbeddedZookeeper(2181);
List<Integer> kafkaPorts = new ArrayList<Integer>();
// -1 for any available port
@@ -50,7 +50,7 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport {
}
@AfterClass
- public static void afterClass(){
+ public static void afterClass() {
embeddedKafkaCluster.shutdown();
embeddedZookeeper.shutdown();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/37f0b229/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
index 2b40724..acac628 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.kafka;
+import java.util.Properties;
+
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
@@ -27,15 +29,13 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.util.Properties;
-
public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest {
public static final String TOPIC = "test";
- @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + "&zookeeperHost=localhost&zookeeperPort=2181&" +
- "groupId=group1&autoOffsetReset=smallest&" +
- "autoCommitEnable=false&batchSize=3&consumerStreams=1")
+ @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + "&zookeeperHost=localhost&zookeeperPort=2181&"
+ + "groupId=group1&autoOffsetReset=smallest&"
+ + "autoCommitEnable=false&batchSize=3&consumerStreams=1")
private Endpoint from;
@EndpointInject(uri = "mock:result")
@@ -91,7 +91,7 @@ public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest {
to.expectedBodiesReceivedInAnyOrder("m1", "m2", "m3", "m4", "m5", "m6", "m7", "m8", "m9", "m10");
//Second route must wake up and consume all from scratch and commit 9 consumed
- for (int k = 3; k <=10; k++) {
+ for (int k = 3; k <= 10; k++) {
String msg = "m" + k;
KeyedMessage<String, String> data = new KeyedMessage<String, String>(TOPIC, "1", msg);
producer.send(data);
http://git-wip-us.apache.org/repos/asf/camel/blob/37f0b229/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
index 922ffa6..8cd17e9 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
@@ -35,8 +35,8 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest {
public static final String TOPIC = "test";
- @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + "&zookeeperHost=localhost&zookeeperPort=2181" +
- "&groupId=group1&autoOffsetReset=smallest")
+ @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + "&zookeeperHost=localhost&zookeeperPort=2181"
+ + "&groupId=group1&autoOffsetReset=smallest")
private Endpoint from;
@EndpointInject(uri = "mock:result")
http://git-wip-us.apache.org/repos/asf/camel/blob/37f0b229/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index b86e402..95a4d8d 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -51,8 +51,8 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerFullTest.class);
@EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC
- + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner&serializerClass=kafka.serializer.StringEncoder" +
- "&requestRequiredAcks=-1")
+ + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner&serializerClass=kafka.serializer.StringEncoder"
+ + "&requestRequiredAcks=-1")
private Endpoint to;
@Produce(uri = "direct:start")
http://git-wip-us.apache.org/repos/asf/camel/blob/37f0b229/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
index bf32064..f23dd46 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
@@ -16,15 +16,18 @@
*/
package org.apache.camel.component.kafka.embedded;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
import kafka.admin.AdminUtils;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.I0Itec.zkclient.ZkClient;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.util.*;
-
public class EmbeddedKafkaCluster {
private final List<Integer> ports;
private final String zkConnection;
@@ -53,15 +56,15 @@ public class EmbeddedKafkaCluster {
this.brokerList = constructBrokerList(this.ports);
}
- public ZkClient getZkClient(){
- for(KafkaServer server : brokers){
+ public ZkClient getZkClient() {
+ for (KafkaServer server : brokers) {
return server.zkClient();
}
return null;
}
- public void createTopics(String...topics){
- for(String topic : topics){
+ public void createTopics(String...topics) {
+ for (String topic : topics) {
AdminUtils.createTopic(getZkClient(), topic, 2, 1, new Properties());
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/37f0b229/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java
index ef8d55e..5441668 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java
@@ -16,15 +16,15 @@
*/
package org.apache.camel.component.kafka.embedded;
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
-
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
public class EmbeddedZookeeper {
private int port = -1;
private int tickTime = 500;
@@ -53,7 +53,7 @@ public class EmbeddedZookeeper {
return port;
}
- public void startup() throws IOException{
+ public void startup() throws IOException {
if (this.port == -1) {
this.port = TestUtils.getAvailablePort();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/37f0b229/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/TestUtils.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/TestUtils.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/TestUtils.java
index 26574e5..7a71b14 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/TestUtils.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/TestUtils.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import java.net.ServerSocket;
import java.util.Random;
-class TestUtils {
+final class TestUtils {
private static final Random RANDOM = new Random();
private TestUtils() {