You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/02/23 11:22:58 UTC
[1/2] kylin git commit: KYLIN-1436: If error exists during fetching
streaming messages, streaming building should throw exception
Repository: kylin
Updated Branches:
refs/heads/2.0-rc f18f7b025 -> 924421810
KYLIN-1436: If error exists during fetching streaming messages, streaming building should throw exception
Signed-off-by: honma <ho...@ebay.com>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/92442181
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/92442181
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/92442181
Branch: refs/heads/2.0-rc
Commit: 9244218103906566d50bb1eb17e6dbd8da2eee6e
Parents: 1f93bec
Author: yangzhong <ya...@ebay.com>
Authored: Tue Feb 23 18:01:17 2016 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Feb 23 18:22:08 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/source/kafka/KafkaStreamingInput.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/92442181/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index bcde47b..2e262b3 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -90,8 +90,7 @@ public class KafkaStreamingInput implements IStreamingInput {
logger.warn("this thread should not be interrupted, just ignore", e);
continue;
} catch (ExecutionException e) {
- logger.error("error when get StreamingMessages", e.getCause());
- continue;
+ throw new RuntimeException("error when get StreamingMessages",e.getCause());
}
}
final Pair<Long, Long> timeRange = Pair.newPair(startTime, endTime);
[2/2] kylin git commit: KYLIN-1435: Relax the checking for
PartitionMetadata and logger the error code
Posted by ma...@apache.org.
KYLIN-1435: Relax the checking for PartitionMetadata and logger the error code
Signed-off-by: honma <ho...@ebay.com>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1f93bec1
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1f93bec1
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1f93bec1
Branch: refs/heads/2.0-rc
Commit: 1f93bec1ca14fe0bdc2943fa9000404401f0acf5
Parents: f18f7b0
Author: yangzhong <ya...@ebay.com>
Authored: Tue Feb 23 17:37:11 2016 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Feb 23 18:22:08 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/source/kafka/KafkaStreamingInput.java | 5 ++++-
.../java/org/apache/kylin/source/kafka/util/KafkaUtils.java | 5 ++++-
2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/1f93bec1/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index ee5a555..bcde47b 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -123,7 +123,10 @@ public class KafkaStreamingInput implements IStreamingInput {
private Broker getLeadBroker() {
final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, replicaBrokers, kafkaClusterConfig);
- if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
+ if (partitionMetadata != null) {
+ if (partitionMetadata.errorCode() != 0){
+ logger.warn("PartitionMetadata errorCode: "+partitionMetadata.errorCode());
+ }
replicaBrokers = partitionMetadata.replicas();
return partitionMetadata.leader();
} else {
http://git-wip-us.apache.org/repos/asf/kylin/blob/1f93bec1/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
index ab54abb..f506999 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
@@ -51,7 +51,10 @@ public final class KafkaUtils {
public static Broker getLeadBroker(KafkaClusterConfig kafkaClusterConfig, int partitionId) {
final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
- if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
+ if (partitionMetadata != null) {
+ if (partitionMetadata.errorCode() != 0){
+ logger.warn("PartitionMetadata errorCode: "+partitionMetadata.errorCode());
+ }
return partitionMetadata.leader();
} else {
return null;