You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/03/08 05:23:37 UTC
[1/8] kylin git commit: KYLIN-1038 retry on job failure
Repository: kylin
Updated Branches:
refs/heads/2.x-staging 17c33dc49 -> 0ec3ed0e8
KYLIN-1038 retry on job failure
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3af7d4a7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3af7d4a7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3af7d4a7
Branch: refs/heads/2.x-staging
Commit: 3af7d4a72c8308f00fe95276b08f05709eaa62e5
Parents: 17c33dc
Author: shaofengshi <sh...@apache.org>
Authored: Sun Feb 14 21:17:12 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Mar 8 12:13:16 2016 +0800
----------------------------------------------------------------------
build/conf/kylin.properties | 3 ++
.../apache/kylin/common/KylinConfigBase.java | 4 ++
.../kylin/job/execution/AbstractExecutable.java | 57 +++++++++++++-------
.../job/execution/DefaultChainedExecutable.java | 5 ++
4 files changed, 50 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/3af7d4a7/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index 78a564d..d694e9f 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -23,6 +23,9 @@ kylin.hbase.cluster.fs=
kylin.job.mapreduce.default.reduce.input.mb=500
+# max job retry on error, default 0: no retry
+kylin.job.retry=0
+
# If true, job engine will not assume that hadoop CLI reside on the same server as it self
# you will have to specify kylin.job.remote.cli.hostname, kylin.job.remote.cli.username and kylin.job.remote.cli.password
# It should not be set to "true" unless you're NOT running Kylin.sh on a hadoop client machine
http://git-wip-us.apache.org/repos/asf/kylin/blob/3af7d4a7/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 826a28c..487f78e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -538,6 +538,10 @@ public class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(getOptional("crossdomain.enable", "true"));
}
+ public int getJobRetry() {
+ return Integer.parseInt(this.getOptional("kylin.job.retry", "0"));
+ }
+
public String toString() {
return getMetadataUrl();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3af7d4a7/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index e1d7106..8d5fea5 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -18,13 +18,10 @@
package org.apache.kylin.job.execution;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kylin.common.KylinConfig;
@@ -35,10 +32,12 @@ import org.apache.kylin.job.manager.ExecutableManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
/**
*/
@@ -50,6 +49,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
protected static final String END_TIME = "endTime";
protected static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class);
+ protected int retry = 0;
private String name;
private String id;
@@ -99,15 +99,30 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
logger.info("Executing >>>>>>>>>>>>> " + this.getName() + " <<<<<<<<<<<<<");
Preconditions.checkArgument(executableContext instanceof DefaultContext);
- ExecuteResult result;
- try {
- onExecuteStart(executableContext);
- result = doWork(executableContext);
- } catch (Throwable e) {
- logger.error("error running Executable", e);
- onExecuteError(e, executableContext);
- throw new ExecuteException(e);
+ ExecuteResult result = null;
+
+ onExecuteStart(executableContext);
+ Throwable exception;
+ do {
+ if (retry > 0) {
+ logger.info("Retry " + retry);
+ }
+ exception = null;
+ result = null;
+ try {
+ result = doWork(executableContext);
+ } catch (Throwable e) {
+ logger.error("error running Executable", e);
+ exception = e;
+ }
+ retry++;
+ } while (((result != null && result.succeed() == false) || exception != null) && needRetry() == true);
+
+ if (exception != null) {
+ onExecuteError(exception, executableContext);
+ throw new ExecuteException(exception);
}
+
onExecuteFinished(result, executableContext);
return result;
}
@@ -301,6 +316,10 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
return status == ExecutableState.DISCARDED;
}
+ protected boolean needRetry() {
+ return this.retry <= KylinConfig.getInstanceFromEnv().getJobRetry();
+ }
+
@Override
public String toString() {
return Objects.toStringHelper(this).add("id", getId()).add("name", getName()).add("state", getStatus()).toString();
http://git-wip-us.apache.org/repos/asf/kylin/blob/3af7d4a7/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 2e95711..7403715 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -106,6 +106,11 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
return subTasks;
}
+ @Override
+ protected boolean needRetry() {
+ return false;
+ }
+
public final AbstractExecutable getTaskByName(String name) {
for (AbstractExecutable task : subTasks) {
if (task.getName() != null && task.getName().equalsIgnoreCase(name)) {
[8/8] kylin git commit: KYLIN-1421 fix the “Last build time” is always empty issue
Posted by sh...@apache.org.
KYLIN-1421 fix the “Last build time” is always empty issue
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0ec3ed0e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0ec3ed0e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0ec3ed0e
Branch: refs/heads/2.x-staging
Commit: 0ec3ed0e899eeab1497667ed28f10226aec520e7
Parents: d1a574b
Author: shaofengshi <sh...@apache.org>
Authored: Tue Feb 16 14:07:00 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Mar 8 12:21:56 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/0ec3ed0e/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
index b4182fe..f5cb66e 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
@@ -118,6 +118,7 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder {
CubeSegment segment = cubeManager.appendSegments(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond(), false, false);
segment.setLastBuildJobID(segment.getUuid()); // give a fake job id
segment.setInputRecords(streamingBatch.getMessages().size());
+ segment.setLastBuildTime(System.currentTimeMillis());
return segment;
} catch (IOException e) {
throw new RuntimeException("failed to create IBuildable", e);
[7/8] kylin git commit: KYLIN-1417 Change to use TreeMap to allow
null as value
Posted by sh...@apache.org.
KYLIN-1417 Change to use TreeMap to allow null as value
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d1a574b8
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d1a574b8
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d1a574b8
Branch: refs/heads/2.x-staging
Commit: d1a574b80ef35ebdae69718998a2dfcaeafc3cbc
Parents: 50aab0b
Author: shaofengshi <sh...@apache.org>
Authored: Tue Feb 16 10:29:47 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Mar 8 12:21:40 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/source/kafka/TimedJsonStreamParser.java | 10 ++++------
1 file changed, 4 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/d1a574b8/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 0907623..e3075d5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -35,11 +35,7 @@
package org.apache.kylin.source.kafka;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import kafka.message.MessageAndOffset;
@@ -102,7 +98,9 @@ public final class TimedJsonStreamParser extends StreamingParser {
@Override
public StreamingMessage parse(MessageAndOffset messageAndOffset) {
try {
- Map<String, String> root = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+ Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+ Map<String, String> root = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
+ root.putAll(message);
String tsStr = root.get(tsColName);
//Preconditions.checkArgument(!StringUtils.isEmpty(tsStr), "Timestamp field " + tsColName + //
//" cannot be null, the message offset is " + messageAndOffset.getOffset() + " content is " + new String(messageAndOffset.getRawData()));
[3/8] kylin git commit: KYLIN-1420 enhance and update test case
Posted by sh...@apache.org.
KYLIN-1420 enhance and update test case
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/250978d8
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/250978d8
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/250978d8
Branch: refs/heads/2.x-staging
Commit: 250978d887e577582f26fc05036d6a4af82dfd0b
Parents: 3736f72
Author: shaofengshi <sh...@apache.org>
Authored: Mon Feb 15 18:06:18 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Mar 8 12:15:11 2016 +0800
----------------------------------------------------------------------
.../kylin/gridtable/GTScanRangePlanner.java | 27 ++++++++------------
.../kylin/gridtable/DictGridTableTest.java | 2 +-
2 files changed, 11 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/250978d8/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
index d314dde..559a245 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
@@ -1,17 +1,8 @@
package org.apache.kylin.gridtable;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
@@ -26,9 +17,7 @@ import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import java.util.*;
public class GTScanRangePlanner {
@@ -108,8 +97,10 @@ public class GTScanRangePlanner {
for (ColumnRange range : andDimRanges) {
if (partitionColRef != null && range.column.equals(partitionColRef)) {
if (rangeStartEndComparator.comparator.compare(segmentStartAndEnd.getFirst(), range.end) <= 0 //
- && rangeStartEndComparator.comparator.compare(range.begin, segmentStartAndEnd.getSecond()) <= 0) {
- //segment range is [Closed,Open), but segmentStartAndEnd.getSecond() might be rounded, so use <=.
+ && (rangeStartEndComparator.comparator.compare(range.begin, segmentStartAndEnd.getSecond()) < 0 //
+ || rangeStartEndComparator.comparator.compare(range.begin, segmentStartAndEnd.getSecond()) == 0 //
+ && (range.op == FilterOperatorEnum.EQ || range.op == FilterOperatorEnum.LTE || range.op == FilterOperatorEnum.GTE || range.op == FilterOperatorEnum.IN))) {
+ //segment range is [Closed,Open), but segmentStartAndEnd.getSecond() might be rounded, so use <= when has equals in condition.
} else {
logger.debug("Pre-check partition col filter failed, partitionColRef {}, segment start {}, segment end {}, range begin {}, range end {}",//
new Object[] { partitionColRef, makeReadable(segmentStartAndEnd.getFirst()), makeReadable(segmentStartAndEnd.getSecond()), makeReadable(range.begin), makeReadable(range.end) });
@@ -346,9 +337,11 @@ public class GTScanRangePlanner {
private ByteArray begin = ByteArray.EMPTY;
private ByteArray end = ByteArray.EMPTY;
private Set<ByteArray> valueSet;
+ private FilterOperatorEnum op;
public ColumnRange(TblColRef column, Set<ByteArray> values, FilterOperatorEnum op) {
this.column = column;
+ this.op = op;
switch (op) {
case EQ:
http://git-wip-us.apache.org/repos/asf/kylin/blob/250978d8/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
index df69c17..674aa15 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
@@ -118,7 +118,7 @@ public class DictGridTableTest {
{
LogicalTupleFilter filter = and(timeComp4, ageComp1);
List<GTScanRange> r = planner.planScanRanges(filter);
- assertEquals(0, r.size());
+ assertEquals(1, r.size());
}
{
LogicalTupleFilter filter = and(timeComp5, ageComp1);
[2/8] kylin git commit: KYLIN-1420 Query returns empty result on
partition column's boundary condition
Posted by sh...@apache.org.
KYLIN-1420 Query returns empty result on partition column's boundary condition
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3736f72c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3736f72c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3736f72c
Branch: refs/heads/2.x-staging
Commit: 3736f72cc756af28a39a473efc47bec943ba7fc9
Parents: 3af7d4a
Author: shaofengshi <sh...@apache.org>
Authored: Mon Feb 15 15:52:15 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Mar 8 12:14:18 2016 +0800
----------------------------------------------------------------------
.../main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/3736f72c/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
index a72426d..d314dde 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
@@ -108,8 +108,8 @@ public class GTScanRangePlanner {
for (ColumnRange range : andDimRanges) {
if (partitionColRef != null && range.column.equals(partitionColRef)) {
if (rangeStartEndComparator.comparator.compare(segmentStartAndEnd.getFirst(), range.end) <= 0 //
- && rangeStartEndComparator.comparator.compare(range.begin, segmentStartAndEnd.getSecond()) < 0) {
- //segment range is [Closed,Open)
+ && rangeStartEndComparator.comparator.compare(range.begin, segmentStartAndEnd.getSecond()) <= 0) {
+ //segment range is [Closed,Open), but segmentStartAndEnd.getSecond() might be rounded, so use <=.
} else {
logger.debug("Pre-check partition col filter failed, partitionColRef {}, segment start {}, segment end {}, range begin {}, range end {}",//
new Object[] { partitionColRef, makeReadable(segmentStartAndEnd.getFirst()), makeReadable(segmentStartAndEnd.getSecond()), makeReadable(range.begin), makeReadable(range.end) });
[4/8] kylin git commit: KYLIN-1387 Streaming cubing doesn't generate
cuboids files on HDFS, cause cube merge failure
Posted by sh...@apache.org.
KYLIN-1387 Streaming cubing doesn't generate cuboids files on HDFS, cause cube merge failure
Conflicts:
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/929c7a49
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/929c7a49
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/929c7a49
Branch: refs/heads/2.x-staging
Commit: 929c7a4908a3cd655ab31e71eb6453971f3acd36
Parents: 250978d
Author: shaofengshi <sh...@apache.org>
Authored: Tue Feb 2 17:34:46 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Mar 8 12:19:36 2016 +0800
----------------------------------------------------------------------
.../cube/inmemcubing/CompoundCuboidWriter.java | 57 ++++++++++++++
.../kylin/cube/inmemcubing/ICuboidWriter.java | 4 +-
.../kylin/job/constant/ExecutableConstants.java | 1 +
.../kylin/engine/mr/steps/KVGTRecordWriter.java | 81 ++++++++++++++++++++
.../mr/steps/MapContextGTRecordWriter.java | 68 ++--------------
.../streaming/cube/StreamingCubeBuilder.java | 12 ++-
.../storage/hbase/steps/HBaseCuboidWriter.java | 24 +++---
.../hbase/steps/HBaseMROutput2Transition.java | 2 +-
.../kylin/storage/hbase/steps/HBaseMRSteps.java | 2 +-
.../hbase/steps/HBaseStreamingOutput.java | 8 +-
.../hbase/steps/SequenceFileCuboidWriter.java | 75 ++++++++++++++++++
11 files changed, 254 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
new file mode 100644
index 0000000..46eef50
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.io.IOException;
+
+/**
+ */
+public class CompoundCuboidWriter implements ICuboidWriter {
+
+ private Iterable<ICuboidWriter> cuboidWriters;
+
+ public CompoundCuboidWriter(Iterable<ICuboidWriter> cuboidWriters) {
+ this.cuboidWriters = cuboidWriters;
+
+ }
+
+ @Override
+ public void write(long cuboidId, GTRecord record) throws IOException {
+ for (ICuboidWriter writer : cuboidWriters) {
+ writer.write(cuboidId, record);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ for (ICuboidWriter writer : cuboidWriters) {
+ writer.flush();
+ }
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ for (ICuboidWriter writer : cuboidWriters) {
+ writer.close();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
index 9e26e5e..e6cfa02 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
@@ -27,7 +27,7 @@ public interface ICuboidWriter {
void write(long cuboidId, GTRecord record) throws IOException;
- void flush();
+ void flush() throws IOException;
- void close();
+ void close() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index ba50880..d370b0d 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -56,6 +56,7 @@ public final class ExecutableConstants {
public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage Collection";
+ public static final String STEP_NAME_GARBAGE_COLLECTION_HDFS = "Garbage Collection on HDFS";
public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile";
http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
new file mode 100644
index 0000000..e201705
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
@@ -0,0 +1,81 @@
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ */
+public abstract class KVGTRecordWriter implements ICuboidWriter {
+
+ private static final Log logger = LogFactory.getLog(KVGTRecordWriter.class);
+ private Long lastCuboidId;
+ protected CubeSegment cubeSegment;
+ protected CubeDesc cubeDesc;
+
+ private AbstractRowKeyEncoder rowKeyEncoder;
+ private int dimensions;
+ private int measureCount;
+ private byte[] keyBuf;
+ private int[] measureColumnsIndex;
+ private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+ private ByteArrayWritable outputKey = new ByteArrayWritable();
+ private ByteArrayWritable outputValue = new ByteArrayWritable();
+ private long cuboidRowCount = 0;
+
+ //for shard
+
+ public KVGTRecordWriter(CubeDesc cubeDesc, CubeSegment cubeSegment) {
+ this.cubeDesc = cubeDesc;
+ this.cubeSegment = cubeSegment;
+ this.measureCount = cubeDesc.getMeasures().size();
+ }
+
+ @Override
+ public void write(long cuboidId, GTRecord record) throws IOException {
+
+ if (lastCuboidId == null || !lastCuboidId.equals(cuboidId)) {
+ if (lastCuboidId != null) {
+ logger.info("Cuboid " + lastCuboidId + " has " + cuboidRowCount + " rows");
+ cuboidRowCount = 0;
+ }
+ // output another cuboid
+ initVariables(cuboidId);
+ lastCuboidId = cuboidId;
+ }
+
+ cuboidRowCount++;
+ rowKeyEncoder.encode(record, record.getInfo().getPrimaryKey(), keyBuf);
+
+ //output measures
+ valueBuf.clear();
+ record.exportColumns(measureColumnsIndex, valueBuf);
+
+ outputKey.set(keyBuf, 0, keyBuf.length);
+ outputValue.set(valueBuf.array(), 0, valueBuf.position());
+ writeAsKeyValue(outputKey, outputValue);
+ }
+
+ protected abstract void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws IOException;
+
+ private void initVariables(Long cuboidId) {
+ rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, Cuboid.findById(cubeDesc, cuboidId));
+ keyBuf = rowKeyEncoder.createBuf();
+
+ dimensions = Long.bitCount(cuboidId);
+ measureColumnsIndex = new int[measureCount];
+ for (int i = 0; i < measureCount; i++) {
+ measureColumnsIndex[i] = dimensions + i;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
index 8416d95..6b4d07d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
@@ -1,76 +1,32 @@
package org.apache.kylin.engine.mr.steps;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.MapContext;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
-import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
-import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.gridtable.GTRecord;
+
+import java.io.IOException;
/**
*/
-public class MapContextGTRecordWriter implements ICuboidWriter {
+public class MapContextGTRecordWriter extends KVGTRecordWriter {
private static final Log logger = LogFactory.getLog(MapContextGTRecordWriter.class);
protected MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext;
- private Long lastCuboidId;
- protected CubeSegment cubeSegment;
- protected CubeDesc cubeDesc;
-
- private AbstractRowKeyEncoder rowKeyEncoder;
- private int dimensions;
- private int measureCount;
- private byte[] keyBuf;
- private int[] measureColumnsIndex;
- private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- private ByteArrayWritable outputKey = new ByteArrayWritable();
- private ByteArrayWritable outputValue = new ByteArrayWritable();
- private long cuboidRowCount = 0;
-
- //for shard
public MapContextGTRecordWriter(MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) {
+ super(cubeDesc, cubeSegment);
this.mapContext = mapContext;
- this.cubeDesc = cubeDesc;
- this.cubeSegment = cubeSegment;
- this.measureCount = cubeDesc.getMeasures().size();
}
@Override
- public void write(long cuboidId, GTRecord record) throws IOException {
-
- if (lastCuboidId == null || !lastCuboidId.equals(cuboidId)) {
- if (lastCuboidId != null) {
- logger.info("Cuboid " + lastCuboidId + " has " + cuboidRowCount + " rows");
- cuboidRowCount = 0;
- }
- // output another cuboid
- initVariables(cuboidId);
- lastCuboidId = cuboidId;
- }
-
- cuboidRowCount++;
- rowKeyEncoder.encode(record, record.getInfo().getPrimaryKey(), keyBuf);
-
- //output measures
- valueBuf.clear();
- record.exportColumns(measureColumnsIndex, valueBuf);
-
- outputKey.set(keyBuf, 0, keyBuf.length);
- outputValue.set(valueBuf.array(), 0, valueBuf.position());
+ protected void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws IOException {
try {
- mapContext.write(outputKey, outputValue);
+ mapContext.write(key, value);
} catch (InterruptedException e) {
- throw new RuntimeException(e);
+ throw new IOException(e);
}
}
@@ -84,14 +40,4 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
}
- private void initVariables(Long cuboidId) {
- rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, Cuboid.findById(cubeDesc, cuboidId));
- keyBuf = rowKeyEncoder.createBuf();
-
- dimensions = Long.bitCount(cuboidId);
- measureColumnsIndex = new int[measureCount];
- for (int i = 0; i < measureCount; i++) {
- measureColumnsIndex[i] = dimensions + i;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
index c4f2b7e..ec2ad91 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
@@ -99,6 +99,14 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
+ } catch (IOException e) {
+ throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
+ } finally {
+ try {
+ cuboidWriter.close();
+ } catch (IOException e) {
+ throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
+ }
}
}
@@ -107,7 +115,9 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder {
CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
try {
- return cubeManager.appendSegments(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond(), false, false);
+ CubeSegment segment = cubeManager.appendSegments(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond(), false, false);
+ segment.setLastBuildJobID(segment.getUuid()); // give a fake job id
+ return segment;
} catch (IOException e) {
throw new RuntimeException("failed to create IBuildable", e);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
index c4dc0b5..ddc868d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
@@ -33,9 +33,8 @@
*/
package org.apache.kylin.storage.hbase.steps;
-import java.io.IOException;
-import java.util.List;
-
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
@@ -51,13 +50,14 @@ import org.apache.kylin.gridtable.GTRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
/**
*/
-public final class HBaseCuboidWriter implements ICuboidWriter {
+public class HBaseCuboidWriter implements ICuboidWriter {
- private static final Logger logger = LoggerFactory.getLogger(HBaseStreamingOutput.class);
+ private static final Logger logger = LoggerFactory.getLogger(HBaseCuboidWriter.class);
private static final int BATCH_PUT_THRESHOLD = 10000;
@@ -125,8 +125,8 @@ public final class HBaseCuboidWriter implements ICuboidWriter {
}
}
- public final void flush() {
- try {
+ @Override
+ public final void flush() throws IOException {
if (!puts.isEmpty()) {
long t = System.currentTimeMillis();
if (hTable != null) {
@@ -136,14 +136,12 @@ public final class HBaseCuboidWriter implements ICuboidWriter {
logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms");
puts.clear();
}
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
}
@Override
- public void close() {
-
+ public void close() throws IOException {
+ flush();
+ IOUtils.closeQuietly(hTable);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index 4c2737d..7bb3647 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -80,7 +80,7 @@ public class HBaseMROutput2Transition implements IMROutput2 {
@Override
public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
- jobFlow.addTask(steps.createMergeGCStep());
+ steps.addMergingGarbageCollectionSteps(jobFlow);
}
};
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 2a21640..a828728 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -161,7 +161,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
toDeletePaths.addAll(getMergingHDFSPaths());
HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
- step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+ step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
step.setDeletePaths(toDeletePaths);
step.setJobId(jobId);
http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
index 770be3c..4cc4794 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
@@ -18,9 +18,11 @@
package org.apache.kylin.storage.hbase.steps;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
+import com.google.common.collect.Lists;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -31,6 +33,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.inmemcubing.CompoundCuboidWriter;
import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.mr.common.BatchConstants;
@@ -54,7 +57,10 @@ public class HBaseStreamingOutput implements IStreamingOutput {
final HTableInterface hTable;
hTable = createHTable(cubeSegment);
- return new HBaseCuboidWriter(cubeSegment, hTable);
+ List<ICuboidWriter> cuboidWriters = Lists.newArrayList();
+ cuboidWriters.add(new HBaseCuboidWriter(cubeSegment, hTable));
+ cuboidWriters.add(new SequenceFileCuboidWriter(cubeSegment.getCubeDesc(), cubeSegment));
+ return new CompoundCuboidWriter(cuboidWriters);
} catch (IOException e) {
throw new RuntimeException("failed to get ICuboidWriter", e);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/929c7a49/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
new file mode 100644
index 0000000..4d76522
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
@@ -0,0 +1,75 @@
+package org.apache.kylin.storage.hbase.steps;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.steps.KVGTRecordWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ */
+public class SequenceFileCuboidWriter extends KVGTRecordWriter {
+
+ private static final Logger logger = LoggerFactory.getLogger(SequenceFileCuboidWriter.class);
+ private SequenceFile.Writer writer = null;
+
+ public SequenceFileCuboidWriter(CubeDesc cubeDesc, CubeSegment segment) {
+ super(cubeDesc, segment);
+ }
+
+
+ @Override
+ protected void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws IOException {
+ if (writer == null) {
+ synchronized (SequenceFileCuboidWriter.class) {
+ if (writer == null) {
+ JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(cubeSegment, "SYSTEM");
+ String cuboidRoot = jobBuilderSupport.getCuboidRootPath(cubeSegment);
+ Path cuboidPath = new Path(cuboidRoot);
+ FileSystem fs = HadoopUtil.getFileSystem(cuboidRoot);
+ try {
+ if (fs.exists(cuboidPath)) {
+ fs.delete(cuboidPath, true);
+ }
+
+ fs.mkdirs(cuboidPath);
+ } finally {
+ IOUtils.closeQuietly(fs);
+ }
+
+ Path cuboidFile = new Path(cuboidPath, "data.seq");
+ logger.debug("Cuboid is written to " + cuboidFile);
+ writer = SequenceFile.createWriter(HadoopUtil.getCurrentConfiguration(), SequenceFile.Writer.file(cuboidFile), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class));
+ }
+ }
+ }
+
+ Text outputValue = new Text();
+ Text outputKey = new Text();
+ outputKey.set(key.array(), key.offset(), key.length());
+ outputValue.set(value.array(), value.offset(), value.length());
+ writer.append(outputKey, outputValue);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (writer != null) {
+ writer.hflush();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.closeQuietly(writer);
+ }
+}
[5/8] kylin git commit: KYLIN-1387 should support empty segment
Posted by sh...@apache.org.
KYLIN-1387 should support empty segment
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/9352e5a9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/9352e5a9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/9352e5a9
Branch: refs/heads/2.x-staging
Commit: 9352e5a9e87e8b7bae41303ac1556ca7b7dc0023
Parents: 929c7a4
Author: shaofengshi <sh...@apache.org>
Authored: Tue Feb 16 09:42:34 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Mar 8 12:19:54 2016 +0800
----------------------------------------------------------------------
.../hbase/steps/SequenceFileCuboidWriter.java | 48 +++++++++++---------
1 file changed, 26 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/9352e5a9/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
index 4d76522..8c2d5e4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
@@ -25,35 +25,39 @@ public class SequenceFileCuboidWriter extends KVGTRecordWriter {
public SequenceFileCuboidWriter(CubeDesc cubeDesc, CubeSegment segment) {
super(cubeDesc, segment);
+ try {
+ initiate();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
-
- @Override
- protected void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws IOException {
+ protected void initiate() throws IOException {
if (writer == null) {
- synchronized (SequenceFileCuboidWriter.class) {
- if (writer == null) {
- JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(cubeSegment, "SYSTEM");
- String cuboidRoot = jobBuilderSupport.getCuboidRootPath(cubeSegment);
- Path cuboidPath = new Path(cuboidRoot);
- FileSystem fs = HadoopUtil.getFileSystem(cuboidRoot);
- try {
- if (fs.exists(cuboidPath)) {
- fs.delete(cuboidPath, true);
- }
-
- fs.mkdirs(cuboidPath);
- } finally {
- IOUtils.closeQuietly(fs);
- }
-
- Path cuboidFile = new Path(cuboidPath, "data.seq");
- logger.debug("Cuboid is written to " + cuboidFile);
- writer = SequenceFile.createWriter(HadoopUtil.getCurrentConfiguration(), SequenceFile.Writer.file(cuboidFile), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class));
+ JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(cubeSegment, "SYSTEM");
+ String cuboidRoot = jobBuilderSupport.getCuboidRootPath(cubeSegment);
+ Path cuboidPath = new Path(cuboidRoot);
+ FileSystem fs = HadoopUtil.getFileSystem(cuboidRoot);
+ try {
+ if (fs.exists(cuboidPath)) {
+ fs.delete(cuboidPath, true);
}
+
+ fs.mkdirs(cuboidPath);
+ } finally {
+ IOUtils.closeQuietly(fs);
}
+
+ Path cuboidFile = new Path(cuboidPath, "data.seq");
+ logger.debug("Cuboid is written to " + cuboidFile);
+ writer = SequenceFile.createWriter(HadoopUtil.getCurrentConfiguration(), SequenceFile.Writer.file(cuboidFile), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class));
}
+ }
+
+ @Override
+ protected void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws IOException {
+
Text outputValue = new Text();
Text outputKey = new Text();
outputKey.set(key.array(), key.offset(), key.length());
[6/8] kylin git commit: KYLIN-1421 Cube "source record" is always
zero for streaming
Posted by sh...@apache.org.
KYLIN-1421 Cube "source record" is always zero for streaming
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/50aab0b0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/50aab0b0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/50aab0b0
Branch: refs/heads/2.x-staging
Commit: 50aab0b08ea1532998db1f11084c1ae908a42e56
Parents: 9352e5a
Author: shaofengshi <sh...@apache.org>
Authored: Tue Feb 16 10:30:05 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Mar 8 12:20:05 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/50aab0b0/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
index ec2ad91..b4182fe 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
@@ -117,6 +117,7 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder {
try {
CubeSegment segment = cubeManager.appendSegments(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond(), false, false);
segment.setLastBuildJobID(segment.getUuid()); // give a fake job id
+ segment.setInputRecords(streamingBatch.getMessages().size());
return segment;
} catch (IOException e) {
throw new RuntimeException("failed to create IBuildable", e);