You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/03/08 05:23:37 UTC

[1/8] kylin git commit: KYLIN-1038 retry on job failure

Repository: kylin
Updated Branches:
  refs/heads/2.x-staging 17c33dc49 -> 0ec3ed0e8


KYLIN-1038 retry on job failure


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3af7d4a7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3af7d4a7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3af7d4a7

Branch: refs/heads/2.x-staging
Commit: 3af7d4a72c8308f00fe95276b08f05709eaa62e5
Parents: 17c33dc
Author: shaofengshi <sh...@apache.org>
Authored: Sun Feb 14 21:17:12 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Mar 8 12:13:16 2016 +0800

----------------------------------------------------------------------
 build/conf/kylin.properties                     |  3 ++
 .../apache/kylin/common/KylinConfigBase.java    |  4 ++
 .../kylin/job/execution/AbstractExecutable.java | 57 +++++++++++++-------
 .../job/execution/DefaultChainedExecutable.java |  5 ++
 4 files changed, 50 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/3af7d4a7/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index 78a564d..d694e9f 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -23,6 +23,9 @@ kylin.hbase.cluster.fs=
 
 kylin.job.mapreduce.default.reduce.input.mb=500
 
+# max job retry on error, default 0: no retry
+kylin.job.retry=0
+
 # If true, job engine will not assume that hadoop CLI reside on the same server as it self
 # you will have to specify kylin.job.remote.cli.hostname, kylin.job.remote.cli.username and kylin.job.remote.cli.password
 # It should not be set to "true" unless you're NOT running Kylin.sh on a hadoop client machine 

http://git-wip-us.apache.org/repos/asf/kylin/blob/3af7d4a7/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 826a28c..487f78e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -538,6 +538,10 @@ public class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("crossdomain.enable", "true"));
     }
 
+    public int getJobRetry() {
+        return Integer.parseInt(this.getOptional("kylin.job.retry", "0"));
+    }
+
     public String toString() {
         return getMetadataUrl();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3af7d4a7/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index e1d7106..8d5fea5 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -18,13 +18,10 @@
 
 package org.apache.kylin.job.execution;
 
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.kylin.common.KylinConfig;
@@ -35,10 +32,12 @@ import org.apache.kylin.job.manager.ExecutableManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 
 /**
  */
@@ -50,6 +49,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
     protected static final String END_TIME = "endTime";
 
     protected static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class);
+    protected int retry = 0;
 
     private String name;
     private String id;
@@ -99,15 +99,30 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         logger.info("Executing >>>>>>>>>>>>>   " + this.getName() + "   <<<<<<<<<<<<<");
 
         Preconditions.checkArgument(executableContext instanceof DefaultContext);
-        ExecuteResult result;
-        try {
-            onExecuteStart(executableContext);
-            result = doWork(executableContext);
-        } catch (Throwable e) {
-            logger.error("error running Executable", e);
-            onExecuteError(e, executableContext);
-            throw new ExecuteException(e);
+        ExecuteResult result = null;
+
+        onExecuteStart(executableContext);
+        Throwable exception;
+        do {
+            if (retry > 0) {
+                logger.info("Retry " + retry);
+            }
+            exception = null;
+            result = null;
+            try {
+                result = doWork(executableContext);
+            } catch (Throwable e) {
+                logger.error("error running Executable", e);
+                exception = e;
+            }
+            retry++;
+        } while (((result != null && result.succeed() == false) || exception != null) && needRetry() == true);
+        
+        if (exception != null) {
+            onExecuteError(exception, executableContext);
+            throw new ExecuteException(exception);
         }
+        
         onExecuteFinished(result, executableContext);
         return result;
     }
@@ -301,6 +316,10 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         return status == ExecutableState.DISCARDED;
     }
 
+    protected boolean needRetry() {
+        return this.retry <= KylinConfig.getInstanceFromEnv().getJobRetry();
+    }
+
     @Override
     public String toString() {
         return Objects.toStringHelper(this).add("id", getId()).add("name", getName()).add("state", getStatus()).toString();

http://git-wip-us.apache.org/repos/asf/kylin/blob/3af7d4a7/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 2e95711..7403715 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -106,6 +106,11 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
         return subTasks;
     }
 
+    @Override
+    protected boolean needRetry() {
+        return false;
+    }
+
     public final AbstractExecutable getTaskByName(String name) {
         for (AbstractExecutable task : subTasks) {
             if (task.getName() != null && task.getName().equalsIgnoreCase(name)) {


[8/8] kylin git commit: KYLIN-1421 fix the “Last build time” is always empty issue

Posted by sh...@apache.org.
KYLIN-1421 fix the “Last build time” is always empty issue


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0ec3ed0e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0ec3ed0e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0ec3ed0e

Branch: refs/heads/2.x-staging
Commit: 0ec3ed0e899eeab1497667ed28f10226aec520e7
Parents: d1a574b
Author: shaofengshi <sh...@apache.org>
Authored: Tue Feb 16 14:07:00 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Mar 8 12:21:56 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java    | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/0ec3ed0e/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
index b4182fe..f5cb66e 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
@@ -118,6 +118,7 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder {
             CubeSegment segment = cubeManager.appendSegments(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond(), false, false);
             segment.setLastBuildJobID(segment.getUuid()); // give a fake job id
             segment.setInputRecords(streamingBatch.getMessages().size());
+            segment.setLastBuildTime(System.currentTimeMillis());
             return segment;
         } catch (IOException e) {
             throw new RuntimeException("failed to create IBuildable", e);


[7/8] kylin git commit: KYLIN-1417 Change to use TreeMap to allow null as value

Posted by sh...@apache.org.
KYLIN-1417 Change to use TreeMap to allow null as value


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d1a574b8
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d1a574b8
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d1a574b8

Branch: refs/heads/2.x-staging
Commit: d1a574b80ef35ebdae69718998a2dfcaeafc3cbc
Parents: 50aab0b
Author: shaofengshi <sh...@apache.org>
Authored: Tue Feb 16 10:29:47 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Mar 8 12:21:40 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/source/kafka/TimedJsonStreamParser.java  | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d1a574b8/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 0907623..e3075d5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -35,11 +35,7 @@
 package org.apache.kylin.source.kafka;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import kafka.message.MessageAndOffset;
 
@@ -102,7 +98,9 @@ public final class TimedJsonStreamParser extends StreamingParser {
     @Override
     public StreamingMessage parse(MessageAndOffset messageAndOffset) {
         try {
-            Map<String, String> root = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+            Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+            Map<String, String> root = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
+            root.putAll(message);
             String tsStr = root.get(tsColName);
             //Preconditions.checkArgument(!StringUtils.isEmpty(tsStr), "Timestamp field " + tsColName + //
             //" cannot be null, the message offset is " + messageAndOffset.getOffset() + " content is " + new String(messageAndOffset.getRawData()));


[3/8] kylin git commit: KYLIN-1420 enhance and update test case

Posted by sh...@apache.org.
KYLIN-1420 enhance and update test case

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/250978d8
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/250978d8
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/250978d8

Branch: refs/heads/2.x-staging
Commit: 250978d887e577582f26fc05036d6a4af82dfd0b
Parents: 3736f72
Author: shaofengshi <sh...@apache.org>
Authored: Mon Feb 15 18:06:18 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Mar 8 12:15:11 2016 +0800

----------------------------------------------------------------------
 .../kylin/gridtable/GTScanRangePlanner.java     | 27 ++++++++------------
 .../kylin/gridtable/DictGridTableTest.java      |  2 +-
 2 files changed, 11 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/250978d8/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
index d314dde..559a245 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
@@ -1,17 +1,8 @@
 package org.apache.kylin.gridtable;
 
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
@@ -26,9 +17,7 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import java.util.*;
 
 public class GTScanRangePlanner {
 
@@ -108,8 +97,10 @@ public class GTScanRangePlanner {
         for (ColumnRange range : andDimRanges) {
             if (partitionColRef != null && range.column.equals(partitionColRef)) {
                 if (rangeStartEndComparator.comparator.compare(segmentStartAndEnd.getFirst(), range.end) <= 0 //
-                        && rangeStartEndComparator.comparator.compare(range.begin, segmentStartAndEnd.getSecond()) <= 0) {
-                    //segment range is [Closed,Open), but segmentStartAndEnd.getSecond() might be rounded, so use <=. 
+                        && (rangeStartEndComparator.comparator.compare(range.begin, segmentStartAndEnd.getSecond()) < 0 //
+                        || rangeStartEndComparator.comparator.compare(range.begin, segmentStartAndEnd.getSecond()) == 0 //
+                        && (range.op == FilterOperatorEnum.EQ || range.op == FilterOperatorEnum.LTE || range.op == FilterOperatorEnum.GTE || range.op == FilterOperatorEnum.IN))) {
+                    //segment range is [Closed,Open), but segmentStartAndEnd.getSecond() might be rounded, so use <= when has equals in condition. 
                 } else {
                     logger.debug("Pre-check partition col filter failed, partitionColRef {}, segment start {}, segment end {}, range begin {}, range end {}",//
                             new Object[] { partitionColRef, makeReadable(segmentStartAndEnd.getFirst()), makeReadable(segmentStartAndEnd.getSecond()), makeReadable(range.begin), makeReadable(range.end) });
@@ -346,9 +337,11 @@ public class GTScanRangePlanner {
         private ByteArray begin = ByteArray.EMPTY;
         private ByteArray end = ByteArray.EMPTY;
         private Set<ByteArray> valueSet;
+        private FilterOperatorEnum op;
 
         public ColumnRange(TblColRef column, Set<ByteArray> values, FilterOperatorEnum op) {
             this.column = column;
+            this.op = op;
 
             switch (op) {
             case EQ:

http://git-wip-us.apache.org/repos/asf/kylin/blob/250978d8/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
index df69c17..674aa15 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
@@ -118,7 +118,7 @@ public class DictGridTableTest {
         {
             LogicalTupleFilter filter = and(timeComp4, ageComp1);
             List<GTScanRange> r = planner.planScanRanges(filter);
-            assertEquals(0, r.size());
+            assertEquals(1, r.size());
         }
         {
             LogicalTupleFilter filter = and(timeComp5, ageComp1);


[2/8] kylin git commit: KYLIN-1420 Query returns empty result on partition column's boundary condition

Posted by sh...@apache.org.
KYLIN-1420 Query returns empty result on partition column's boundary condition


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3736f72c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3736f72c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3736f72c

Branch: refs/heads/2.x-staging
Commit: 3736f72cc756af28a39a473efc47bec943ba7fc9
Parents: 3af7d4a
Author: shaofengshi <sh...@apache.org>
Authored: Mon Feb 15 15:52:15 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Mar 8 12:14:18 2016 +0800

----------------------------------------------------------------------
 .../main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/3736f72c/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
index a72426d..d314dde 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
@@ -108,8 +108,8 @@ public class GTScanRangePlanner {
         for (ColumnRange range : andDimRanges) {
             if (partitionColRef != null && range.column.equals(partitionColRef)) {
                 if (rangeStartEndComparator.comparator.compare(segmentStartAndEnd.getFirst(), range.end) <= 0 //
-                        && rangeStartEndComparator.comparator.compare(range.begin, segmentStartAndEnd.getSecond()) < 0) {
-                    //segment range is [Closed,Open)
+                        && rangeStartEndComparator.comparator.compare(range.begin, segmentStartAndEnd.getSecond()) <= 0) {
+                    //segment range is [Closed,Open), but segmentStartAndEnd.getSecond() might be rounded, so use <=. 
                 } else {
                     logger.debug("Pre-check partition col filter failed, partitionColRef {}, segment start {}, segment end {}, range begin {}, range end {}",//
                             new Object[] { partitionColRef, makeReadable(segmentStartAndEnd.getFirst()), makeReadable(segmentStartAndEnd.getSecond()), makeReadable(range.begin), makeReadable(range.end) });


[4/8] kylin git commit: KYLIN-1387 Streaming cubing doesn't generate cuboids files on HDFS, cause cube merge failure

Posted by sh...@apache.org.
KYLIN-1387 Streaming cubing doesn't generate cuboids files on HDFS, cause cube merge failure

Conflicts:
	engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/929c7a49
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/929c7a49
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/929c7a49

Branch: refs/heads/2.x-staging
Commit: 929c7a4908a3cd655ab31e71eb6453971f3acd36
Parents: 250978d
Author: shaofengshi <sh...@apache.org>
Authored: Tue Feb 2 17:34:46 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Mar 8 12:19:36 2016 +0800

----------------------------------------------------------------------
 .../cube/inmemcubing/CompoundCuboidWriter.java  | 57 ++++++++++++++
 .../kylin/cube/inmemcubing/ICuboidWriter.java   |  4 +-
 .../kylin/job/constant/ExecutableConstants.java |  1 +
 .../kylin/engine/mr/steps/KVGTRecordWriter.java | 81 ++++++++++++++++++++
 .../mr/steps/MapContextGTRecordWriter.java      | 68 ++--------------
 .../streaming/cube/StreamingCubeBuilder.java    | 12 ++-
 .../storage/hbase/steps/HBaseCuboidWriter.java  | 24 +++---
 .../hbase/steps/HBaseMROutput2Transition.java   |  2 +-
 .../kylin/storage/hbase/steps/HBaseMRSteps.java |  2 +-
 .../hbase/steps/HBaseStreamingOutput.java       |  8 +-
 .../hbase/steps/SequenceFileCuboidWriter.java   | 75 ++++++++++++++++++
 11 files changed, 254 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
new file mode 100644
index 0000000..46eef50
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
@@ -0,0 +1,57 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.io.IOException;
+
+/**
+ */
+public class CompoundCuboidWriter implements ICuboidWriter {
+
+    private Iterable<ICuboidWriter> cuboidWriters;
+
+    public CompoundCuboidWriter(Iterable<ICuboidWriter> cuboidWriters) {
+        this.cuboidWriters = cuboidWriters;
+
+    }
+
+    @Override
+    public void write(long cuboidId, GTRecord record) throws IOException {
+        for (ICuboidWriter writer : cuboidWriters) {
+            writer.write(cuboidId, record);
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        for (ICuboidWriter writer : cuboidWriters) {
+            writer.flush();
+        }
+
+    }
+
+    @Override
+    public void close() throws IOException {
+        for (ICuboidWriter writer : cuboidWriters) {
+            writer.close();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
index 9e26e5e..e6cfa02 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
@@ -27,7 +27,7 @@ public interface ICuboidWriter {
 
     void write(long cuboidId, GTRecord record) throws IOException;
 
-    void flush();
+    void flush() throws IOException;
     
-    void close();
+    void close() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index ba50880..d370b0d 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -56,6 +56,7 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
     public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
     public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage Collection";
+    public static final String STEP_NAME_GARBAGE_COLLECTION_HDFS = "Garbage Collection on HDFS";
 
     public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
     public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile";

http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
new file mode 100644
index 0000000..e201705
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
@@ -0,0 +1,81 @@
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ */
+public abstract class KVGTRecordWriter implements ICuboidWriter {
+
+    private static final Log logger = LogFactory.getLog(KVGTRecordWriter.class);
+    private Long lastCuboidId;
+    protected CubeSegment cubeSegment;
+    protected CubeDesc cubeDesc;
+
+    private AbstractRowKeyEncoder rowKeyEncoder;
+    private int dimensions;
+    private int measureCount;
+    private byte[] keyBuf;
+    private int[] measureColumnsIndex;
+    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+    private ByteArrayWritable outputKey = new ByteArrayWritable();
+    private ByteArrayWritable outputValue = new ByteArrayWritable();
+    private long cuboidRowCount = 0;
+
+    //for shard
+
+    public KVGTRecordWriter(CubeDesc cubeDesc, CubeSegment cubeSegment) {
+        this.cubeDesc = cubeDesc;
+        this.cubeSegment = cubeSegment;
+        this.measureCount = cubeDesc.getMeasures().size();
+    }
+
+    @Override
+    public void write(long cuboidId, GTRecord record) throws IOException {
+
+        if (lastCuboidId == null || !lastCuboidId.equals(cuboidId)) {
+            if (lastCuboidId != null) {
+                logger.info("Cuboid " + lastCuboidId + " has " + cuboidRowCount + " rows");
+                cuboidRowCount = 0;
+            }
+            // output another cuboid
+            initVariables(cuboidId);
+            lastCuboidId = cuboidId;
+        }
+
+        cuboidRowCount++;
+        rowKeyEncoder.encode(record, record.getInfo().getPrimaryKey(), keyBuf);
+
+        //output measures
+        valueBuf.clear();
+        record.exportColumns(measureColumnsIndex, valueBuf);
+
+        outputKey.set(keyBuf, 0, keyBuf.length);
+        outputValue.set(valueBuf.array(), 0, valueBuf.position());
+        writeAsKeyValue(outputKey, outputValue);
+    }
+
+    protected abstract void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws IOException;
+
+    private void initVariables(Long cuboidId) {
+        rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, Cuboid.findById(cubeDesc, cuboidId));
+        keyBuf = rowKeyEncoder.createBuf();
+
+        dimensions = Long.bitCount(cuboidId);
+        measureColumnsIndex = new int[measureCount];
+        for (int i = 0; i < measureCount; i++) {
+            measureColumnsIndex[i] = dimensions + i;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
index 8416d95..6b4d07d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
@@ -1,76 +1,32 @@
 package org.apache.kylin.engine.mr.steps;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.MapContext;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
-import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
-import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.gridtable.GTRecord;
+
+import java.io.IOException;
 
 /**
  */
-public class MapContextGTRecordWriter implements ICuboidWriter {
+public class MapContextGTRecordWriter extends KVGTRecordWriter {
 
     private static final Log logger = LogFactory.getLog(MapContextGTRecordWriter.class);
     protected MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext;
-    private Long lastCuboidId;
-    protected CubeSegment cubeSegment;
-    protected CubeDesc cubeDesc;
-
-    private AbstractRowKeyEncoder rowKeyEncoder;
-    private int dimensions;
-    private int measureCount;
-    private byte[] keyBuf;
-    private int[] measureColumnsIndex;
-    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-    private ByteArrayWritable outputKey = new ByteArrayWritable();
-    private ByteArrayWritable outputValue = new ByteArrayWritable();
-    private long cuboidRowCount = 0;
-
-    //for shard
 
     public MapContextGTRecordWriter(MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) {
+        super(cubeDesc, cubeSegment);
         this.mapContext = mapContext;
-        this.cubeDesc = cubeDesc;
-        this.cubeSegment = cubeSegment;
-        this.measureCount = cubeDesc.getMeasures().size();
     }
 
     @Override
-    public void write(long cuboidId, GTRecord record) throws IOException {
-
-        if (lastCuboidId == null || !lastCuboidId.equals(cuboidId)) {
-            if (lastCuboidId != null) {
-                logger.info("Cuboid " + lastCuboidId + " has " + cuboidRowCount + " rows");
-                cuboidRowCount = 0;
-            }
-            // output another cuboid
-            initVariables(cuboidId);
-            lastCuboidId = cuboidId;
-        }
-
-        cuboidRowCount++;
-        rowKeyEncoder.encode(record, record.getInfo().getPrimaryKey(), keyBuf);
-
-        //output measures
-        valueBuf.clear();
-        record.exportColumns(measureColumnsIndex, valueBuf);
-
-        outputKey.set(keyBuf, 0, keyBuf.length);
-        outputValue.set(valueBuf.array(), 0, valueBuf.position());
+    protected void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws IOException {
         try {
-            mapContext.write(outputKey, outputValue);
+            mapContext.write(key, value);
         } catch (InterruptedException e) {
-            throw new RuntimeException(e);
+            throw new IOException(e);
         }
     }
 
@@ -84,14 +40,4 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
 
     }
 
-    private void initVariables(Long cuboidId) {
-        rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, Cuboid.findById(cubeDesc, cuboidId));
-        keyBuf = rowKeyEncoder.createBuf();
-
-        dimensions = Long.bitCount(cuboidId);
-        measureColumnsIndex = new int[measureCount];
-        for (int i = 0; i < measureCount; i++) {
-            measureColumnsIndex[i] = dimensions + i;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
index c4f2b7e..ec2ad91 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
@@ -99,6 +99,14 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder {
             throw new RuntimeException(e);
         } catch (ExecutionException e) {
             throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
+        } catch (IOException e) {
+            throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
+        } finally {
+            try {
+                cuboidWriter.close();
+            } catch (IOException e) {
+                throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
+            }
         }
     }
 
@@ -107,7 +115,9 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder {
         CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
         final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
         try {
-            return cubeManager.appendSegments(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond(), false, false);
+            CubeSegment segment = cubeManager.appendSegments(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond(), false, false);
+            segment.setLastBuildJobID(segment.getUuid()); // give a fake job id
+            return segment;
         } catch (IOException e) {
             throw new RuntimeException("failed to create IBuildable", e);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
index c4dc0b5..ddc868d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
@@ -33,9 +33,8 @@
  */
 package org.apache.kylin.storage.hbase.steps;
 
-import java.io.IOException;
-import java.util.List;
-
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
@@ -51,13 +50,14 @@ import org.apache.kylin.gridtable.GTRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
 
 /**
  */
-public final class HBaseCuboidWriter implements ICuboidWriter {
+public class HBaseCuboidWriter implements ICuboidWriter {
 
-    private static final Logger logger = LoggerFactory.getLogger(HBaseStreamingOutput.class);
+    private static final Logger logger = LoggerFactory.getLogger(HBaseCuboidWriter.class);
 
     private static final int BATCH_PUT_THRESHOLD = 10000;
 
@@ -125,8 +125,8 @@ public final class HBaseCuboidWriter implements ICuboidWriter {
         }
     }
 
-    public final void flush() {
-        try {
+    @Override
+    public final void flush() throws IOException {
             if (!puts.isEmpty()) {
                 long t = System.currentTimeMillis();
                 if (hTable != null) {
@@ -136,14 +136,12 @@ public final class HBaseCuboidWriter implements ICuboidWriter {
                 logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms");
                 puts.clear();
             }
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
     }
 
     @Override
-    public void close() {
-
+    public void close() throws IOException {
+        flush();
+        IOUtils.closeQuietly(hTable);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index 4c2737d..7bb3647 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -80,7 +80,7 @@ public class HBaseMROutput2Transition implements IMROutput2 {
 
             @Override
             public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
-                jobFlow.addTask(steps.createMergeGCStep());
+                steps.addMergingGarbageCollectionSteps(jobFlow);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 2a21640..a828728 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -161,7 +161,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
         toDeletePaths.addAll(getMergingHDFSPaths());
 
         HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
-        step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+        step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
         step.setDeletePaths(toDeletePaths);
         step.setJobId(jobId);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
index 770be3c..4cc4794 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
@@ -18,9 +18,11 @@
 package org.apache.kylin.storage.hbase.steps;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -31,6 +33,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.inmemcubing.CompoundCuboidWriter;
 import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
 import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.common.BatchConstants;
@@ -54,7 +57,10 @@ public class HBaseStreamingOutput implements IStreamingOutput {
 
             final HTableInterface hTable;
             hTable = createHTable(cubeSegment);
-            return new HBaseCuboidWriter(cubeSegment, hTable);
+            List<ICuboidWriter> cuboidWriters = Lists.newArrayList();
+            cuboidWriters.add(new HBaseCuboidWriter(cubeSegment, hTable));
+            cuboidWriters.add(new SequenceFileCuboidWriter(cubeSegment.getCubeDesc(), cubeSegment));
+            return new CompoundCuboidWriter(cuboidWriters);
         } catch (IOException e) {
             throw new RuntimeException("failed to get ICuboidWriter", e);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
new file mode 100644
index 0000000..4d76522
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
@@ -0,0 +1,75 @@
+package org.apache.kylin.storage.hbase.steps;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.steps.KVGTRecordWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ */
+public class SequenceFileCuboidWriter extends KVGTRecordWriter {
+
+    private static final Logger logger = LoggerFactory.getLogger(SequenceFileCuboidWriter.class);
+    private SequenceFile.Writer writer = null;
+
+    public SequenceFileCuboidWriter(CubeDesc cubeDesc, CubeSegment segment) {
+        super(cubeDesc, segment);
+    }
+
+
+    @Override
+    protected void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws IOException {
+        if (writer == null) {
+            synchronized (SequenceFileCuboidWriter.class) {
+                if (writer == null) {
+                    JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(cubeSegment, "SYSTEM");
+                    String cuboidRoot = jobBuilderSupport.getCuboidRootPath(cubeSegment);
+                    Path cuboidPath = new Path(cuboidRoot);
+                    FileSystem fs = HadoopUtil.getFileSystem(cuboidRoot);
+                    try {
+                        if (fs.exists(cuboidPath)) {
+                            fs.delete(cuboidPath, true);
+                        }
+
+                        fs.mkdirs(cuboidPath);
+                    } finally {
+                        IOUtils.closeQuietly(fs);
+                    }
+
+                    Path cuboidFile = new Path(cuboidPath, "data.seq");
+                    logger.debug("Cuboid is written to " + cuboidFile);
+                    writer = SequenceFile.createWriter(HadoopUtil.getCurrentConfiguration(), SequenceFile.Writer.file(cuboidFile), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class));
+                }
+            }
+        }
+
+        Text outputValue = new Text();
+        Text outputKey = new Text();
+        outputKey.set(key.array(), key.offset(), key.length());
+        outputValue.set(value.array(), value.offset(), value.length());
+        writer.append(outputKey, outputValue);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        if (writer != null) {
+            writer.hflush();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        IOUtils.closeQuietly(writer);
+    }
+}


[5/8] kylin git commit: KYLIN-1387 should support empty segment

Posted by sh...@apache.org.
KYLIN-1387 should support empty segment


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/9352e5a9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/9352e5a9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/9352e5a9

Branch: refs/heads/2.x-staging
Commit: 9352e5a9e87e8b7bae41303ac1556ca7b7dc0023
Parents: 929c7a4
Author: shaofengshi <sh...@apache.org>
Authored: Tue Feb 16 09:42:34 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Mar 8 12:19:54 2016 +0800

----------------------------------------------------------------------
 .../hbase/steps/SequenceFileCuboidWriter.java   | 48 +++++++++++---------
 1 file changed, 26 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/9352e5a9/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
index 4d76522..8c2d5e4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
@@ -25,35 +25,39 @@ public class SequenceFileCuboidWriter extends KVGTRecordWriter {
 
     public SequenceFileCuboidWriter(CubeDesc cubeDesc, CubeSegment segment) {
         super(cubeDesc, segment);
+        try {
+            initiate();
+        } catch (IOException e) {
+           throw new RuntimeException(e);
+        }
     }
 
-
-    @Override
-    protected void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws IOException {
+    protected void initiate() throws IOException {
         if (writer == null) {
-            synchronized (SequenceFileCuboidWriter.class) {
-                if (writer == null) {
-                    JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(cubeSegment, "SYSTEM");
-                    String cuboidRoot = jobBuilderSupport.getCuboidRootPath(cubeSegment);
-                    Path cuboidPath = new Path(cuboidRoot);
-                    FileSystem fs = HadoopUtil.getFileSystem(cuboidRoot);
-                    try {
-                        if (fs.exists(cuboidPath)) {
-                            fs.delete(cuboidPath, true);
-                        }
-
-                        fs.mkdirs(cuboidPath);
-                    } finally {
-                        IOUtils.closeQuietly(fs);
-                    }
-
-                    Path cuboidFile = new Path(cuboidPath, "data.seq");
-                    logger.debug("Cuboid is written to " + cuboidFile);
-                    writer = SequenceFile.createWriter(HadoopUtil.getCurrentConfiguration(), SequenceFile.Writer.file(cuboidFile), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class));
+            JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(cubeSegment, "SYSTEM");
+            String cuboidRoot = jobBuilderSupport.getCuboidRootPath(cubeSegment);
+            Path cuboidPath = new Path(cuboidRoot);
+            FileSystem fs = HadoopUtil.getFileSystem(cuboidRoot);
+            try {
+                if (fs.exists(cuboidPath)) {
+                    fs.delete(cuboidPath, true);
                 }
+
+                fs.mkdirs(cuboidPath);
+            } finally {
+                IOUtils.closeQuietly(fs);
             }
+
+            Path cuboidFile = new Path(cuboidPath, "data.seq");
+            logger.debug("Cuboid is written to " + cuboidFile);
+            writer = SequenceFile.createWriter(HadoopUtil.getCurrentConfiguration(), SequenceFile.Writer.file(cuboidFile), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class));
         }
 
+    }
+
+    @Override
+    protected void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws IOException {
+       
         Text outputValue = new Text();
         Text outputKey = new Text();
         outputKey.set(key.array(), key.offset(), key.length());


[6/8] kylin git commit: KYLIN-1421 Cube "source record" is always zero for streaming

Posted by sh...@apache.org.
KYLIN-1421 Cube "source record" is always zero for streaming


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/50aab0b0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/50aab0b0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/50aab0b0

Branch: refs/heads/2.x-staging
Commit: 50aab0b08ea1532998db1f11084c1ae908a42e56
Parents: 9352e5a
Author: shaofengshi <sh...@apache.org>
Authored: Tue Feb 16 10:30:05 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Mar 8 12:20:05 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java    | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/50aab0b0/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
index ec2ad91..b4182fe 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
@@ -117,6 +117,7 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder {
         try {
             CubeSegment segment = cubeManager.appendSegments(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond(), false, false);
             segment.setLastBuildJobID(segment.getUuid()); // give a fake job id
+            segment.setInputRecords(streamingBatch.getMessages().size());
             return segment;
         } catch (IOException e) {
             throw new RuntimeException("failed to create IBuildable", e);