You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/12/20 11:27:32 UTC
[07/50] [abbrv] kylin git commit: add check on kafka topic and broker
info
add check on kafka topic and broker info
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d11d019c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d11d019c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d11d019c
Branch: refs/heads/master-cdh5.7
Commit: d11d019c98e11bf058381752d605f842a8eb30b1
Parents: a4ddbbd
Author: shaofengshi <sh...@apache.org>
Authored: Mon Dec 12 16:22:15 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Dec 12 16:22:15 2016 +0800
----------------------------------------------------------------------
.../org/apache/kylin/source/kafka/KafkaConfigManager.java | 8 ++++++++
.../java/org/apache/kylin/source/kafka/util/KafkaClient.java | 5 +++++
2 files changed, 13 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/d11d019c/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index e76422c..775f052 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -191,6 +191,14 @@ public class KafkaConfigManager {
throw new IllegalArgumentException();
}
+ if (StringUtils.isEmpty(kafkaConfig.getTopic())) {
+ throw new IllegalArgumentException("No topic info");
+ }
+
+ if (kafkaConfig.getKafkaClusterConfigs() == null || kafkaConfig.getKafkaClusterConfigs().size() ==0) {
+ throw new IllegalArgumentException("No cluster info");
+ }
+
String path = KafkaConfig.concatResourcePath(kafkaConfig.getName());
getStore().putResource(path, kafkaConfig, KafkaConfig.SERIALIZER);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d11d019c/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
index 2a7b0e8..3b970b3 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -18,6 +18,7 @@
package org.apache.kylin.source.kafka.util;
import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.PartitionInfo;
@@ -96,6 +97,10 @@ public class KafkaClient {
}
}
}
+
+ if (StringUtils.isEmpty(brokers)) {
+ throw new IllegalArgumentException("No cluster info in Kafka config '" + kafkaConfig.getName() + "'");
+ }
return brokers;
}