You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/06/12 10:08:07 UTC

incubator-kylin git commit: 1. fix bug for binary search Kafka offset' 2. add availability check when hbase table created

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 797d6136a -> 06cd46672


1. fix bug for binary search Kafka offset'
2. add availability check when hbase table created


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/06cd4667
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/06cd4667
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/06cd4667

Branch: refs/heads/0.8
Commit: 06cd46672c6121273f7c8fcc49317a6b89a0c6eb
Parents: 797d613
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Jun 12 16:06:14 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Jun 12 16:07:14 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/job/hadoop/hbase/CubeHTableUtil.java   |  2 ++
 .../java/org/apache/kylin/streaming/StreamingUtil.java  | 12 ++++++------
 2 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/06cd4667/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java
index d281d8d..88b345b 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java
@@ -1,5 +1,6 @@
 package org.apache.kylin.job.hadoop.hbase;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -70,6 +71,7 @@ public class CubeHTableUtil {
             DeployCoprocessorCLI.deployCoprocessor(tableDesc);
 
             admin.createTable(tableDesc, splitKeys);
+            Preconditions.checkArgument(admin.isTableAvailable(tableName), "table " + tableName + " created, but is not available due to some reasons");
             logger.info("create hbase table " + tableName + " done.");
         } catch (Exception e) {
             logger.error("Failed to create HTable", e);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/06cd4667/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
index 142dee1..2560986 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
@@ -72,12 +72,12 @@ public final class StreamingUtil {
         final long latestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaClusterConfig);
         logger.info(String.format("topic: %s, partitionId: %d, try to find closest offset with timestamp: %d between offset {%d, %d}", topic, partitionId, timestamp, earliestOffset, latestOffset));
         final long result = binarySearch(kafkaClusterConfig, partitionId, earliestOffset, latestOffset, timestamp, streamParser);
-        logger.info("found offset:" + result);
+        logger.info(String.format("topic: %s, partitionId: %d, found offset: %d" + topic, partitionId, result));
         return result;
     }
 
     private static long binarySearch(KafkaClusterConfig kafkaClusterConfig, int partitionId, long startOffset, long endOffset, long targetTimestamp, StreamParser streamParser) {
-        while (startOffset <= endOffset + 2) {
+        while (startOffset < endOffset) {
             long midOffset = startOffset + ((endOffset - startOffset) >> 1);
             long startTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, startOffset, streamParser);
             long endTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, endOffset, streamParser);
@@ -85,19 +85,19 @@ public final class StreamingUtil {
             // hard to ensure these 2 conditions
 //            Preconditions.checkArgument(startTimestamp <= midTimestamp);
 //            Preconditions.checkArgument(midTimestamp <= endTimestamp);
-            if (startTimestamp > targetTimestamp) {
+            if (startTimestamp >= targetTimestamp) {
                 return startOffset;
             }
-            if (endTimestamp < targetTimestamp) {
+            if (endTimestamp <= targetTimestamp) {
                 return endOffset;
             }
             if (targetTimestamp == midTimestamp) {
                 return midOffset;
             } else if (targetTimestamp < midTimestamp) {
-                endOffset = midOffset;
+                endOffset = midOffset - 1;
                 continue;
             } else {
-                startOffset = midOffset;
+                startOffset = midOffset + 1;
                 continue;
             }
         }