You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/06/12 10:08:07 UTC
incubator-kylin git commit: 1. fix bug for binary search Kafka
offset' 2. add availability check when hbase table created
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8 797d6136a -> 06cd46672
1. fix bug for binary search Kafka offset'
2. add availability check when hbase table created
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/06cd4667
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/06cd4667
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/06cd4667
Branch: refs/heads/0.8
Commit: 06cd46672c6121273f7c8fcc49317a6b89a0c6eb
Parents: 797d613
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Jun 12 16:06:14 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Jun 12 16:07:14 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/job/hadoop/hbase/CubeHTableUtil.java | 2 ++
.../java/org/apache/kylin/streaming/StreamingUtil.java | 12 ++++++------
2 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/06cd4667/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java
index d281d8d..88b345b 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java
@@ -1,5 +1,6 @@
package org.apache.kylin.job.hadoop.hbase;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -70,6 +71,7 @@ public class CubeHTableUtil {
DeployCoprocessorCLI.deployCoprocessor(tableDesc);
admin.createTable(tableDesc, splitKeys);
+ Preconditions.checkArgument(admin.isTableAvailable(tableName), "table " + tableName + " created, but is not available due to some reasons");
logger.info("create hbase table " + tableName + " done.");
} catch (Exception e) {
logger.error("Failed to create HTable", e);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/06cd4667/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
index 142dee1..2560986 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
@@ -72,12 +72,12 @@ public final class StreamingUtil {
final long latestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaClusterConfig);
logger.info(String.format("topic: %s, partitionId: %d, try to find closest offset with timestamp: %d between offset {%d, %d}", topic, partitionId, timestamp, earliestOffset, latestOffset));
final long result = binarySearch(kafkaClusterConfig, partitionId, earliestOffset, latestOffset, timestamp, streamParser);
- logger.info("found offset:" + result);
+ logger.info(String.format("topic: %s, partitionId: %d, found offset: %d" + topic, partitionId, result));
return result;
}
private static long binarySearch(KafkaClusterConfig kafkaClusterConfig, int partitionId, long startOffset, long endOffset, long targetTimestamp, StreamParser streamParser) {
- while (startOffset <= endOffset + 2) {
+ while (startOffset < endOffset) {
long midOffset = startOffset + ((endOffset - startOffset) >> 1);
long startTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, startOffset, streamParser);
long endTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, endOffset, streamParser);
@@ -85,19 +85,19 @@ public final class StreamingUtil {
// hard to ensure these 2 conditions
// Preconditions.checkArgument(startTimestamp <= midTimestamp);
// Preconditions.checkArgument(midTimestamp <= endTimestamp);
- if (startTimestamp > targetTimestamp) {
+ if (startTimestamp >= targetTimestamp) {
return startOffset;
}
- if (endTimestamp < targetTimestamp) {
+ if (endTimestamp <= targetTimestamp) {
return endOffset;
}
if (targetTimestamp == midTimestamp) {
return midOffset;
} else if (targetTimestamp < midTimestamp) {
- endOffset = midOffset;
+ endOffset = midOffset - 1;
continue;
} else {
- startOffset = midOffset;
+ startOffset = midOffset + 1;
continue;
}
}