You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/20 03:52:21 UTC
[1/5] incubator-kylin git commit: KYLIN-877 Add IMRInput javadoc
Repository: incubator-kylin
Updated Branches:
refs/heads/KYLIN-878 [created] 9a82f39bd
KYLIN-877 Add IMRInput javadoc
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/2e89ea51
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/2e89ea51
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/2e89ea51
Branch: refs/heads/KYLIN-878
Commit: 2e89ea51dde565e9b308d6cbe5efae857a77e90d
Parents: ad7af7c
Author: Li, Yang <ya...@ebay.com>
Authored: Thu Jul 16 11:27:13 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Jul 17 10:29:04 2015 +0800
----------------------------------------------------------------------
.../kylin/engine/mr/BatchCubingJobBuilder.java | 2 ++
.../org/apache/kylin/engine/mr/IMRInput.java | 22 ++++++++++++++++++++
2 files changed, 24 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2e89ea51/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index c1524e7..239ce64 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -49,9 +49,11 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
// Phase 1: Create Flat Table
inputSide.addStepPhase1_CreateFlatTable(result);
+ // Phase 2: Build Dictionary
result.addTask(createFactDistinctColumnsStep(flatHiveTableDesc, jobId));
result.addTask(createBuildDictionaryStep(jobId));
+ // Phase 3: Build Cube
if (config.isInMemCubing()) {
result.addTask(createSaveStatisticsStep(jobId));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2e89ea51/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index c170b47..08ed94a 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -23,25 +23,47 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.metadata.model.TableDesc;
+/**
+ * Any ITableSource that wishes to serve as input of MapReduce build engine must adapt to this interface.
+ */
public interface IMRInput {
+ /** Return a helper to participate in batch cubing job flow. */
public IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg);
+ /** Return an InputFormat that reads from specified table. */
public IMRTableInputFormat getTableInputFormat(TableDesc table);
+ /**
+ * Utility that configures mapper to read from a table.
+ */
public interface IMRTableInputFormat {
+ /** Configure the InputFormat of given job. */
public void configureJob(Job job);
+ /** Parse a mapper input object into column values. */
public String[] parseMapperInput(Object mapperInput);
}
+ /**
+ * Participate the batch cubing flow as the input side. Responsible for creating
+ * intermediate flat table (Phase 1) and clean up if necessary (Phase 4).
+ *
+ * - Phase 1: Create Flat Table
+ * - Phase 2: Build Dictionary
+ * - Phase 3: Build Cube
+ * - Phase 4: Update Metadata & Cleanup
+ */
public interface IMRBatchCubingInputSide {
+ /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
+ /** Add step that does necessary clean up, like delete the intermediate flat table */
public void addStepPhase4_UpdateMetadataAndCleanup(DefaultChainedExecutable jobFlow);
+ /** Return an InputFormat that reads from the intermediate flat table */
public IMRTableInputFormat getFlatTableInputFormat();
}
}
[2/5] incubator-kylin git commit: KYLIN-878 HBase storage abstraction
for cubing flow
Posted by li...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
deleted file mode 100644
index fdca42b..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
+++ /dev/null
@@ -1,663 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.kylin.common.persistence.HBaseConnection;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowValueDecoder;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseMappingDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.ICachableStorageEngine;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverEnabler;
-import org.apache.kylin.storage.tuple.TupleInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * @author xjiang, yangli9
- */
-public class CubeStorageEngine implements ICachableStorageEngine {
-
- private static final Logger logger = LoggerFactory.getLogger(CubeStorageEngine.class);
-
- private static final int MERGE_KEYRANGE_THRESHOLD = 100;
- private static final long MEM_BUDGET_PER_QUERY = 3L * 1024 * 1024 * 1024; // 3G
-
- private final CubeInstance cubeInstance;
- private final CubeDesc cubeDesc;
- private final String uuid;
-
- public CubeStorageEngine(CubeInstance cube) {
- this.cubeInstance = cube;
- this.cubeDesc = cube.getDescriptor();
- this.uuid = cube.getUuid();
- }
-
- @Override
- public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
-
- Collection<TblColRef> groups = sqlDigest.groupbyColumns;
- TupleFilter filter = sqlDigest.filter;
-
- // build dimension & metrics
- Collection<TblColRef> dimensions = new HashSet<TblColRef>();
- Collection<FunctionDesc> metrics = new HashSet<FunctionDesc>();
- buildDimensionsAndMetrics(dimensions, metrics, sqlDigest);
-
- // all dimensions = groups + others
- Set<TblColRef> others = Sets.newHashSet(dimensions);
- others.removeAll(groups);
-
- // expand derived
- Set<TblColRef> derivedPostAggregation = Sets.newHashSet();
- Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation);
- Set<TblColRef> othersD = expandDerived(others, derivedPostAggregation);
- othersD.removeAll(groupsD);
- derivedPostAggregation.removeAll(groups);
-
- // identify cuboid
- Set<TblColRef> dimensionsD = Sets.newHashSet();
- dimensionsD.addAll(groupsD);
- dimensionsD.addAll(othersD);
- Cuboid cuboid = identifyCuboid(dimensionsD);
- context.setCuboid(cuboid);
-
- // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
- Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
- boolean isExactAggregation = isExactAggregation(cuboid, groups, othersD, singleValuesD, derivedPostAggregation);
- context.setExactAggregation(isExactAggregation);
-
- // translate filter for scan range and compose returning groups for coprocessor, note:
- // - columns on non-evaluatable filter have to return
- // - columns on loosened filter (due to derived translation) have to return
- Set<TblColRef> groupsCopD = Sets.newHashSet(groupsD);
- collectNonEvaluable(filter, groupsCopD);
- TupleFilter filterD = translateDerived(filter, groupsCopD);
-
- // flatten to OR-AND filter, (A AND B AND ..) OR (C AND D AND ..) OR ..
- TupleFilter flatFilter = flattenToOrAndFilter(filterD);
-
- // translate filter into segment scan ranges
- List<HBaseKeyRange> scans = buildScanRanges(flatFilter, dimensionsD);
-
- // check involved measures, build value decoder for each each family:column
- List<RowValueDecoder> valueDecoders = translateAggregation(cubeDesc.getHBaseMapping(), metrics, context);
-
- //memory hungry distinct count are pushed down to coprocessor, no need to set threshold any more
- //setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory
- setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
- setLimit(filter, context);
-
- HConnection conn = HBaseConnection.get(context.getConnUrl());
- return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context, returnTupleInfo);
- }
-
- @Override
- public Range<Long> getVolatilePeriod() {
- return null;
- }
-
- @Override
- public String getStorageUUID() {
- return this.uuid;
- }
-
- @Override
- public boolean isDynamic() {
- return false;
- }
-
- private void buildDimensionsAndMetrics(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, SQLDigest sqlDigest) {
-
- for (FunctionDesc func : sqlDigest.aggregations) {
- if (!func.isDimensionAsMetric()) {
- metrics.add(func);
- }
- }
-
- for (TblColRef column : sqlDigest.allColumns) {
- // skip measure columns
- if (sqlDigest.metricColumns.contains(column)) {
- continue;
- }
- dimensions.add(column);
- }
- }
-
- private Cuboid identifyCuboid(Set<TblColRef> dimensions) {
- long cuboidID = 0;
- for (TblColRef column : dimensions) {
- int index = cubeDesc.getRowkey().getColumnBitIndex(column);
- cuboidID |= 1L << index;
- }
- return Cuboid.findById(cubeDesc, cuboidID);
- }
-
- private boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
- boolean exact = true;
-
- if (cuboid.requirePostAggregation()) {
- exact = false;
- logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
- }
-
- // derived aggregation is bad, unless expanded columns are already in group by
- if (groups.containsAll(derivedPostAggregation) == false) {
- exact = false;
- logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation);
- }
-
- // other columns (from filter) is bad, unless they are ensured to have single value
- if (singleValuesD.containsAll(othersD) == false) {
- exact = false;
- logger.info("exactAggregation is false because some column not on group by: " + othersD //
- + " (single value column: " + singleValuesD + ")");
- }
-
- if (exact) {
- logger.info("exactAggregation is true");
- }
- return exact;
- }
-
- private Set<TblColRef> expandDerived(Collection<TblColRef> cols, Set<TblColRef> derivedPostAggregation) {
- Set<TblColRef> expanded = Sets.newHashSet();
- for (TblColRef col : cols) {
- if (cubeDesc.isDerived(col)) {
- DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
- for (TblColRef hostCol : hostInfo.columns) {
- expanded.add(hostCol);
- if (hostInfo.isOneToOne == false)
- derivedPostAggregation.add(hostCol);
- }
- } else {
- expanded.add(col);
- }
- }
- return expanded;
- }
-
- @SuppressWarnings("unchecked")
- private Set<TblColRef> findSingleValueColumns(TupleFilter filter) {
- Collection<? extends TupleFilter> toCheck;
- if (filter instanceof CompareTupleFilter) {
- toCheck = Collections.singleton(filter);
- } else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) {
- toCheck = filter.getChildren();
- } else {
- return (Set<TblColRef>) Collections.EMPTY_SET;
- }
-
- Set<TblColRef> result = Sets.newHashSet();
- for (TupleFilter f : toCheck) {
- if (f instanceof CompareTupleFilter) {
- CompareTupleFilter compFilter = (CompareTupleFilter) f;
- // is COL=const ?
- if (compFilter.getOperator() == FilterOperatorEnum.EQ && compFilter.getValues().size() == 1 && compFilter.getColumn() != null) {
- result.add(compFilter.getColumn());
- }
- }
- }
-
- // expand derived
- Set<TblColRef> resultD = Sets.newHashSet();
- for (TblColRef col : result) {
- if (cubeDesc.isDerived(col)) {
- DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
- if (hostInfo.isOneToOne) {
- for (TblColRef hostCol : hostInfo.columns) {
- resultD.add(hostCol);
- }
- }
- //if not one2one, it will be pruned
- } else {
- resultD.add(col);
- }
- }
- return resultD;
- }
-
- private void collectNonEvaluable(TupleFilter filter, Set<TblColRef> collector) {
- if (filter == null)
- return;
-
- if (filter.isEvaluable()) {
- for (TupleFilter child : filter.getChildren())
- collectNonEvaluable(child, collector);
- } else {
- collectColumnsRecursively(filter, collector);
- }
- }
-
- private void collectColumnsRecursively(TupleFilter filter, Set<TblColRef> collector) {
- if (filter instanceof ColumnTupleFilter) {
- collectColumns(((ColumnTupleFilter) filter).getColumn(), collector);
- }
- for (TupleFilter child : filter.getChildren()) {
- collectColumnsRecursively(child, collector);
- }
- }
-
- private void collectColumns(TblColRef col, Set<TblColRef> collector) {
- if (cubeDesc.isDerived(col)) {
- DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
- for (TblColRef h : hostInfo.columns)
- collector.add(h);
- } else {
- collector.add(col);
- }
- }
-
- @SuppressWarnings("unchecked")
- private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) {
- if (filter == null)
- return filter;
-
- if (filter instanceof CompareTupleFilter) {
- return translateDerivedInCompare((CompareTupleFilter) filter, collector);
- }
-
- List<TupleFilter> children = (List<TupleFilter>) filter.getChildren();
- List<TupleFilter> newChildren = Lists.newArrayListWithCapacity(children.size());
- boolean modified = false;
- for (TupleFilter child : children) {
- TupleFilter translated = translateDerived(child, collector);
- newChildren.add(translated);
- if (child != translated)
- modified = true;
- }
- if (modified) {
- filter = replaceChildren(filter, newChildren);
- }
- return filter;
- }
-
- private TupleFilter replaceChildren(TupleFilter filter, List<TupleFilter> newChildren) {
- if (filter instanceof LogicalTupleFilter) {
- LogicalTupleFilter r = new LogicalTupleFilter(filter.getOperator());
- r.addChildren(newChildren);
- return r;
- } else
- throw new IllegalStateException("Cannot replaceChildren on " + filter);
- }
-
- private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set<TblColRef> collector) {
- if (compf.getColumn() == null || compf.getValues().isEmpty())
- return compf;
-
- TblColRef derived = compf.getColumn();
- if (cubeDesc.isDerived(derived) == false)
- return compf;
-
- DeriveInfo hostInfo = cubeDesc.getHostInfo(derived);
- CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig());
- CubeSegment seg = cubeInstance.getLatestReadySegment();
- LookupStringTable lookup = cubeMgr.getLookupTable(seg, hostInfo.dimension);
- Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf);
- TupleFilter translatedFilter = translated.getFirst();
- boolean loosened = translated.getSecond();
- if (loosened) {
- collectColumnsRecursively(translatedFilter, collector);
- }
- return translatedFilter;
- }
-
- private List<RowValueDecoder> translateAggregation(HBaseMappingDesc hbaseMapping, Collection<FunctionDesc> metrics, //
- StorageContext context) {
- Map<HBaseColumnDesc, RowValueDecoder> codecMap = Maps.newHashMap();
- for (FunctionDesc aggrFunc : metrics) {
- Collection<HBaseColumnDesc> hbCols = hbaseMapping.findHBaseColumnByFunction(aggrFunc);
- if (hbCols.isEmpty()) {
- throw new IllegalStateException("can't find HBaseColumnDesc for function " + aggrFunc.getFullExpression());
- }
- HBaseColumnDesc bestHBCol = null;
- int bestIndex = -1;
- for (HBaseColumnDesc hbCol : hbCols) {
- bestHBCol = hbCol;
- bestIndex = hbCol.findMeasureIndex(aggrFunc);
- MeasureDesc measure = hbCol.getMeasures()[bestIndex];
- // criteria for holistic measure: Exact Aggregation && Exact Cuboid
- if (measure.getFunction().isHolisticCountDistinct() && context.isExactAggregation()) {
- logger.info("Holistic count distinct chosen for " + aggrFunc);
- break;
- }
- }
-
- RowValueDecoder codec = codecMap.get(bestHBCol);
- if (codec == null) {
- codec = new RowValueDecoder(bestHBCol);
- codecMap.put(bestHBCol, codec);
- }
- codec.setIndex(bestIndex);
- }
- return new ArrayList<RowValueDecoder>(codecMap.values());
- }
-
- private TupleFilter flattenToOrAndFilter(TupleFilter filter) {
- if (filter == null)
- return null;
-
- TupleFilter flatFilter = filter.flatFilter();
-
- // normalize to OR-AND filter
- if (flatFilter.getOperator() == FilterOperatorEnum.AND) {
- LogicalTupleFilter f = new LogicalTupleFilter(FilterOperatorEnum.OR);
- f.addChild(flatFilter);
- flatFilter = f;
- }
-
- if (flatFilter.getOperator() != FilterOperatorEnum.OR)
- throw new IllegalStateException();
-
- return flatFilter;
- }
-
- private List<HBaseKeyRange> buildScanRanges(TupleFilter flatFilter, Collection<TblColRef> dimensionColumns) {
-
- List<HBaseKeyRange> result = Lists.newArrayList();
-
- logger.info("Current cubeInstance is " + cubeInstance + " with " + cubeInstance.getSegments().size() + " segs in all");
- List<CubeSegment> segs = cubeInstance.getSegments(SegmentStatusEnum.READY);
- logger.info("READY segs count: " + segs.size());
-
- // build row key range for each cube segment
- for (CubeSegment cubeSeg : segs) {
-
- // consider derived (lookup snapshot), filter on dimension may
- // differ per segment
- List<Collection<ColumnValueRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter, cubeSeg);
- if (orAndDimRanges == null) { // has conflict
- continue;
- }
-
- List<HBaseKeyRange> scanRanges = Lists.newArrayListWithCapacity(orAndDimRanges.size());
- for (Collection<ColumnValueRange> andDimRanges : orAndDimRanges) {
- HBaseKeyRange rowKeyRange = new HBaseKeyRange(dimensionColumns, andDimRanges, cubeSeg, cubeDesc);
- scanRanges.add(rowKeyRange);
- }
-
- List<HBaseKeyRange> mergedRanges = mergeOverlapRanges(scanRanges);
- mergedRanges = mergeTooManyRanges(mergedRanges);
- result.addAll(mergedRanges);
- }
-
- logger.info("hbasekeyrange count: " + result.size());
- dropUnhitSegments(result);
- logger.info("hbasekeyrange count after dropping unhit :" + result.size());
-
- return result;
- }
-
- private List<Collection<ColumnValueRange>> translateToOrAndDimRanges(TupleFilter flatFilter, CubeSegment cubeSegment) {
- List<Collection<ColumnValueRange>> result = Lists.newArrayList();
-
- if (flatFilter == null) {
- result.add(Collections.<ColumnValueRange> emptyList());
- return result;
- }
-
- for (TupleFilter andFilter : flatFilter.getChildren()) {
- if (andFilter.getOperator() != FilterOperatorEnum.AND) {
- throw new IllegalStateException("Filter should be AND instead of " + andFilter);
- }
-
- Collection<ColumnValueRange> andRanges = translateToAndDimRanges(andFilter.getChildren(), cubeSegment);
-
- result.add(andRanges);
- }
-
- return preprocessConstantConditions(result);
- }
-
- private List<Collection<ColumnValueRange>> preprocessConstantConditions(List<Collection<ColumnValueRange>> orAndRanges) {
- boolean globalAlwaysTrue = false;
- Iterator<Collection<ColumnValueRange>> iterator = orAndRanges.iterator();
- while (iterator.hasNext()) {
- Collection<ColumnValueRange> andRanges = iterator.next();
- Iterator<ColumnValueRange> iterator2 = andRanges.iterator();
- boolean hasAlwaysFalse = false;
- while (iterator2.hasNext()) {
- ColumnValueRange range = iterator2.next();
- if (range.satisfyAll())
- iterator2.remove();
- else if (range.satisfyNone())
- hasAlwaysFalse = true;
- }
- if (hasAlwaysFalse) {
- iterator.remove();
- } else if (andRanges.isEmpty()) {
- globalAlwaysTrue = true;
- break;
- }
- }
- if (globalAlwaysTrue) {
- orAndRanges.clear();
- orAndRanges.add(Collections.<ColumnValueRange> emptyList());
- }
- return orAndRanges;
- }
-
- // return empty collection to mean true; return null to mean false
- @SuppressWarnings("unchecked")
- private Collection<ColumnValueRange> translateToAndDimRanges(List<? extends TupleFilter> andFilters, CubeSegment cubeSegment) {
- Map<TblColRef, ColumnValueRange> rangeMap = new HashMap<TblColRef, ColumnValueRange>();
- for (TupleFilter filter : andFilters) {
- if ((filter instanceof CompareTupleFilter) == false) {
- continue;
- }
-
- CompareTupleFilter comp = (CompareTupleFilter) filter;
- if (comp.getColumn() == null) {
- continue;
- }
-
- ColumnValueRange range = new ColumnValueRange(comp.getColumn(), (Collection<String>) comp.getValues(), comp.getOperator());
- andMerge(range, rangeMap);
- }
-
- // a little pre-evaluation to remove invalid EQ/IN values and round start/end according to dictionary
- Iterator<ColumnValueRange> it = rangeMap.values().iterator();
- while (it.hasNext()) {
- ColumnValueRange range = it.next();
- range.preEvaluateWithDict((Dictionary<String>) cubeSegment.getDictionary(range.getColumn()));
- if (range.satisfyAll())
- it.remove();
- else if (range.satisfyNone())
- return null;
- }
-
- return rangeMap.values();
- }
-
- private void andMerge(ColumnValueRange range, Map<TblColRef, ColumnValueRange> rangeMap) {
- ColumnValueRange columnRange = rangeMap.get(range.getColumn());
- if (columnRange == null) {
- rangeMap.put(range.getColumn(), range);
- } else {
- columnRange.andMerge(range);
- }
- }
-
- private List<HBaseKeyRange> mergeOverlapRanges(List<HBaseKeyRange> keyRanges) {
- if (keyRanges.size() <= 1) {
- return keyRanges;
- }
-
- if (logger.isDebugEnabled()) {
- logger.debug("Merging key range from " + keyRanges.size());
- }
-
- // sort ranges by start key
- Collections.sort(keyRanges);
-
- // merge the overlap range
- List<HBaseKeyRange> mergedRanges = new LinkedList<HBaseKeyRange>();
- int beginIndex = 0;
- byte[] maxStopKey = keyRanges.get(0).getStopKey();
- for (int index = 0; index < keyRanges.size(); index++) {
- HBaseKeyRange keyRange = keyRanges.get(index);
- if (Bytes.compareTo(maxStopKey, keyRange.getStartKey()) < 0) {
- // merge the current key ranges
- HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, beginIndex, index - 1);
- mergedRanges.add(mergedRange);
- // start new merge
- beginIndex = index;
- }
- if (Bytes.compareTo(maxStopKey, keyRange.getStopKey()) < 0) {
- // update the stop key
- maxStopKey = keyRange.getStopKey();
- }
- }
- // merge last range
- HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, beginIndex, keyRanges.size() - 1);
- mergedRanges.add(mergedRange);
- if (logger.isDebugEnabled()) {
- logger.debug("Merging key range to " + mergedRanges.size());
- }
- return mergedRanges;
- }
-
- private HBaseKeyRange mergeKeyRange(List<HBaseKeyRange> keyRanges, int from, int to) {
- HBaseKeyRange keyRange = keyRanges.get(from);
- int mergeSize = to - from + 1;
- if (mergeSize > 1) {
- // merge range from mergeHeader to i - 1
- CubeSegment cubeSegment = keyRange.getCubeSegment();
- Cuboid cuboid = keyRange.getCuboid();
- byte[] startKey = keyRange.getStartKey();
- byte[] stopKey = keyRange.getStopKey();
- long partitionColumnStartDate = Long.MAX_VALUE;
- long partitionColumnEndDate = 0;
- List<Pair<byte[], byte[]>> newFuzzyKeys = new ArrayList<Pair<byte[], byte[]>>(mergeSize);
- List<Collection<ColumnValueRange>> newFlatOrAndFilter = Lists.newLinkedList();
-
- boolean hasNonFuzzyRange = false;
- for (int k = from; k <= to; k++) {
- HBaseKeyRange nextRange = keyRanges.get(k);
- hasNonFuzzyRange = hasNonFuzzyRange || nextRange.getFuzzyKeys().isEmpty();
- newFuzzyKeys.addAll(nextRange.getFuzzyKeys());
- newFlatOrAndFilter.addAll(nextRange.getFlatOrAndFilter());
- if (Bytes.compareTo(stopKey, nextRange.getStopKey()) < 0) {
- stopKey = nextRange.getStopKey();
- }
- if (nextRange.getPartitionColumnStartDate() > 0 && nextRange.getPartitionColumnStartDate() < partitionColumnStartDate) {
- partitionColumnStartDate = nextRange.getPartitionColumnStartDate();
- }
- if (nextRange.getPartitionColumnEndDate() < Long.MAX_VALUE && nextRange.getPartitionColumnEndDate() > partitionColumnEndDate) {
- partitionColumnEndDate = nextRange.getPartitionColumnEndDate();
- }
- }
-
- // if any range is non-fuzzy, then all fuzzy keys must be cleared
- if (hasNonFuzzyRange) {
- newFuzzyKeys.clear();
- }
-
- partitionColumnStartDate = (partitionColumnStartDate == Long.MAX_VALUE) ? 0 : partitionColumnStartDate;
- partitionColumnEndDate = (partitionColumnEndDate == 0) ? Long.MAX_VALUE : partitionColumnEndDate;
- keyRange = new HBaseKeyRange(cubeSegment, cuboid, startKey, stopKey, newFuzzyKeys, newFlatOrAndFilter, partitionColumnStartDate, partitionColumnEndDate);
- }
- return keyRange;
- }
-
- private List<HBaseKeyRange> mergeTooManyRanges(List<HBaseKeyRange> keyRanges) {
- if (keyRanges.size() < MERGE_KEYRANGE_THRESHOLD) {
- return keyRanges;
- }
- // TODO: check the distance between range. and merge the large distance range
- List<HBaseKeyRange> mergedRanges = new LinkedList<HBaseKeyRange>();
- HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, 0, keyRanges.size() - 1);
- mergedRanges.add(mergedRange);
- return mergedRanges;
- }
-
- private void dropUnhitSegments(List<HBaseKeyRange> scans) {
- if (cubeDesc.getModel().getPartitionDesc().isPartitioned()) {
- Iterator<HBaseKeyRange> iterator = scans.iterator();
- while (iterator.hasNext()) {
- HBaseKeyRange scan = iterator.next();
- if (scan.hitSegment() == false) {
- iterator.remove();
- }
- }
- }
- }
-
- private void setThreshold(Collection<TblColRef> dimensions, List<RowValueDecoder> valueDecoders, StorageContext context) {
- if (RowValueDecoder.hasMemHungryCountDistinct(valueDecoders) == false) {
- return;
- }
-
- int rowSizeEst = dimensions.size() * 3;
- for (RowValueDecoder decoder : valueDecoders) {
- MeasureDesc[] measures = decoder.getMeasures();
- BitSet projectionIndex = decoder.getProjectionIndex();
- for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) {
- FunctionDesc func = measures[i].getFunction();
- rowSizeEst += func.getReturnDataType().getSpaceEstimate();
- }
- }
-
- long rowEst = MEM_BUDGET_PER_QUERY / rowSizeEst;
- context.setThreshold((int) rowEst);
- }
-
- private void setLimit(TupleFilter filter, StorageContext context) {
- boolean goodAggr = context.isExactAggregation();
- boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
- boolean goodSort = context.hasSort() == false;
- if (goodAggr && goodFilter && goodSort) {
- logger.info("Enable limit " + context.getLimit());
- context.enableLimit();
- }
- }
-
- private void setCoprocessor(Set<TblColRef> groupsCopD, List<RowValueDecoder> valueDecoders, StorageContext context) {
- ObserverEnabler.enableCoprocessorIfBeneficial(cubeInstance, groupsCopD, valueDecoders, context);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageQuery.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageQuery.java
new file mode 100644
index 0000000..1d76bd4
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageQuery.java
@@ -0,0 +1,663 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowValueDecoder;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseMappingDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.storage.ICachableStorageQuery;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverEnabler;
+import org.apache.kylin.storage.tuple.TupleInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * @author xjiang, yangli9
+ */
+public class CubeStorageQuery implements ICachableStorageQuery {
+
+ private static final Logger logger = LoggerFactory.getLogger(CubeStorageQuery.class);
+
+ private static final int MERGE_KEYRANGE_THRESHOLD = 100;
+ private static final long MEM_BUDGET_PER_QUERY = 3L * 1024 * 1024 * 1024; // 3G
+
+ private final CubeInstance cubeInstance;
+ private final CubeDesc cubeDesc;
+ private final String uuid;
+
+ public CubeStorageQuery(CubeInstance cube) {
+ this.cubeInstance = cube;
+ this.cubeDesc = cube.getDescriptor();
+ this.uuid = cube.getUuid();
+ }
+
+ @Override
+ public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+
+ Collection<TblColRef> groups = sqlDigest.groupbyColumns;
+ TupleFilter filter = sqlDigest.filter;
+
+ // build dimension & metrics
+ Collection<TblColRef> dimensions = new HashSet<TblColRef>();
+ Collection<FunctionDesc> metrics = new HashSet<FunctionDesc>();
+ buildDimensionsAndMetrics(dimensions, metrics, sqlDigest);
+
+ // all dimensions = groups + others
+ Set<TblColRef> others = Sets.newHashSet(dimensions);
+ others.removeAll(groups);
+
+ // expand derived
+ Set<TblColRef> derivedPostAggregation = Sets.newHashSet();
+ Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation);
+ Set<TblColRef> othersD = expandDerived(others, derivedPostAggregation);
+ othersD.removeAll(groupsD);
+ derivedPostAggregation.removeAll(groups);
+
+ // identify cuboid
+ Set<TblColRef> dimensionsD = Sets.newHashSet();
+ dimensionsD.addAll(groupsD);
+ dimensionsD.addAll(othersD);
+ Cuboid cuboid = identifyCuboid(dimensionsD);
+ context.setCuboid(cuboid);
+
+ // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
+ Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
+ boolean isExactAggregation = isExactAggregation(cuboid, groups, othersD, singleValuesD, derivedPostAggregation);
+ context.setExactAggregation(isExactAggregation);
+
+ // translate filter for scan range and compose returning groups for coprocessor, note:
+ // - columns on non-evaluatable filter have to return
+ // - columns on loosened filter (due to derived translation) have to return
+ Set<TblColRef> groupsCopD = Sets.newHashSet(groupsD);
+ collectNonEvaluable(filter, groupsCopD);
+ TupleFilter filterD = translateDerived(filter, groupsCopD);
+
+ // flatten to OR-AND filter, (A AND B AND ..) OR (C AND D AND ..) OR ..
+ TupleFilter flatFilter = flattenToOrAndFilter(filterD);
+
+ // translate filter into segment scan ranges
+ List<HBaseKeyRange> scans = buildScanRanges(flatFilter, dimensionsD);
+
+ // check involved measures, build value decoder for each each family:column
+ List<RowValueDecoder> valueDecoders = translateAggregation(cubeDesc.getHBaseMapping(), metrics, context);
+
+ //memory hungry distinct count are pushed down to coprocessor, no need to set threshold any more
+ //setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory
+ setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
+ setLimit(filter, context);
+
+ HConnection conn = HBaseConnection.get(context.getConnUrl());
+ return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context, returnTupleInfo);
+ }
+
+ @Override
+ public Range<Long> getVolatilePeriod() {
+ return null;
+ }
+
+ @Override
+ public String getStorageUUID() {
+ return this.uuid;
+ }
+
+ @Override
+ public boolean isDynamic() {
+ return false;
+ }
+
+ private void buildDimensionsAndMetrics(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, SQLDigest sqlDigest) {
+
+ for (FunctionDesc func : sqlDigest.aggregations) {
+ if (!func.isDimensionAsMetric()) {
+ metrics.add(func);
+ }
+ }
+
+ for (TblColRef column : sqlDigest.allColumns) {
+ // skip measure columns
+ if (sqlDigest.metricColumns.contains(column)) {
+ continue;
+ }
+ dimensions.add(column);
+ }
+ }
+
+ private Cuboid identifyCuboid(Set<TblColRef> dimensions) {
+ long cuboidID = 0;
+ for (TblColRef column : dimensions) {
+ int index = cubeDesc.getRowkey().getColumnBitIndex(column);
+ cuboidID |= 1L << index;
+ }
+ return Cuboid.findById(cubeDesc, cuboidID);
+ }
+
+ private boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
+ boolean exact = true;
+
+ if (cuboid.requirePostAggregation()) {
+ exact = false;
+ logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
+ }
+
+ // derived aggregation is bad, unless expanded columns are already in group by
+ if (groups.containsAll(derivedPostAggregation) == false) {
+ exact = false;
+ logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation);
+ }
+
+ // other columns (from filter) is bad, unless they are ensured to have single value
+ if (singleValuesD.containsAll(othersD) == false) {
+ exact = false;
+ logger.info("exactAggregation is false because some column not on group by: " + othersD //
+ + " (single value column: " + singleValuesD + ")");
+ }
+
+ if (exact) {
+ logger.info("exactAggregation is true");
+ }
+ return exact;
+ }
+
+ private Set<TblColRef> expandDerived(Collection<TblColRef> cols, Set<TblColRef> derivedPostAggregation) {
+ Set<TblColRef> expanded = Sets.newHashSet();
+ for (TblColRef col : cols) {
+ if (cubeDesc.isDerived(col)) {
+ DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+ for (TblColRef hostCol : hostInfo.columns) {
+ expanded.add(hostCol);
+ if (hostInfo.isOneToOne == false)
+ derivedPostAggregation.add(hostCol);
+ }
+ } else {
+ expanded.add(col);
+ }
+ }
+ return expanded;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Set<TblColRef> findSingleValueColumns(TupleFilter filter) {
+ Collection<? extends TupleFilter> toCheck;
+ if (filter instanceof CompareTupleFilter) {
+ toCheck = Collections.singleton(filter);
+ } else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) {
+ toCheck = filter.getChildren();
+ } else {
+ return (Set<TblColRef>) Collections.EMPTY_SET;
+ }
+
+ Set<TblColRef> result = Sets.newHashSet();
+ for (TupleFilter f : toCheck) {
+ if (f instanceof CompareTupleFilter) {
+ CompareTupleFilter compFilter = (CompareTupleFilter) f;
+ // is COL=const ?
+ if (compFilter.getOperator() == FilterOperatorEnum.EQ && compFilter.getValues().size() == 1 && compFilter.getColumn() != null) {
+ result.add(compFilter.getColumn());
+ }
+ }
+ }
+
+ // expand derived
+ Set<TblColRef> resultD = Sets.newHashSet();
+ for (TblColRef col : result) {
+ if (cubeDesc.isDerived(col)) {
+ DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+ if (hostInfo.isOneToOne) {
+ for (TblColRef hostCol : hostInfo.columns) {
+ resultD.add(hostCol);
+ }
+ }
+ //if not one2one, it will be pruned
+ } else {
+ resultD.add(col);
+ }
+ }
+ return resultD;
+ }
+
+ private void collectNonEvaluable(TupleFilter filter, Set<TblColRef> collector) {
+ if (filter == null)
+ return;
+
+ if (filter.isEvaluable()) {
+ for (TupleFilter child : filter.getChildren())
+ collectNonEvaluable(child, collector);
+ } else {
+ collectColumnsRecursively(filter, collector);
+ }
+ }
+
+ private void collectColumnsRecursively(TupleFilter filter, Set<TblColRef> collector) {
+ if (filter instanceof ColumnTupleFilter) {
+ collectColumns(((ColumnTupleFilter) filter).getColumn(), collector);
+ }
+ for (TupleFilter child : filter.getChildren()) {
+ collectColumnsRecursively(child, collector);
+ }
+ }
+
+ private void collectColumns(TblColRef col, Set<TblColRef> collector) {
+ if (cubeDesc.isDerived(col)) {
+ DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+ for (TblColRef h : hostInfo.columns)
+ collector.add(h);
+ } else {
+ collector.add(col);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) {
+ if (filter == null)
+ return filter;
+
+ if (filter instanceof CompareTupleFilter) {
+ return translateDerivedInCompare((CompareTupleFilter) filter, collector);
+ }
+
+ List<TupleFilter> children = (List<TupleFilter>) filter.getChildren();
+ List<TupleFilter> newChildren = Lists.newArrayListWithCapacity(children.size());
+ boolean modified = false;
+ for (TupleFilter child : children) {
+ TupleFilter translated = translateDerived(child, collector);
+ newChildren.add(translated);
+ if (child != translated)
+ modified = true;
+ }
+ if (modified) {
+ filter = replaceChildren(filter, newChildren);
+ }
+ return filter;
+ }
+
+ private TupleFilter replaceChildren(TupleFilter filter, List<TupleFilter> newChildren) {
+ if (filter instanceof LogicalTupleFilter) {
+ LogicalTupleFilter r = new LogicalTupleFilter(filter.getOperator());
+ r.addChildren(newChildren);
+ return r;
+ } else
+ throw new IllegalStateException("Cannot replaceChildren on " + filter);
+ }
+
+ private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set<TblColRef> collector) {
+ if (compf.getColumn() == null || compf.getValues().isEmpty())
+ return compf;
+
+ TblColRef derived = compf.getColumn();
+ if (cubeDesc.isDerived(derived) == false)
+ return compf;
+
+ DeriveInfo hostInfo = cubeDesc.getHostInfo(derived);
+ CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig());
+ CubeSegment seg = cubeInstance.getLatestReadySegment();
+ LookupStringTable lookup = cubeMgr.getLookupTable(seg, hostInfo.dimension);
+ Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf);
+ TupleFilter translatedFilter = translated.getFirst();
+ boolean loosened = translated.getSecond();
+ if (loosened) {
+ collectColumnsRecursively(translatedFilter, collector);
+ }
+ return translatedFilter;
+ }
+
+ private List<RowValueDecoder> translateAggregation(HBaseMappingDesc hbaseMapping, Collection<FunctionDesc> metrics, //
+ StorageContext context) {
+ Map<HBaseColumnDesc, RowValueDecoder> codecMap = Maps.newHashMap();
+ for (FunctionDesc aggrFunc : metrics) {
+ Collection<HBaseColumnDesc> hbCols = hbaseMapping.findHBaseColumnByFunction(aggrFunc);
+ if (hbCols.isEmpty()) {
+ throw new IllegalStateException("can't find HBaseColumnDesc for function " + aggrFunc.getFullExpression());
+ }
+ HBaseColumnDesc bestHBCol = null;
+ int bestIndex = -1;
+ for (HBaseColumnDesc hbCol : hbCols) {
+ bestHBCol = hbCol;
+ bestIndex = hbCol.findMeasureIndex(aggrFunc);
+ MeasureDesc measure = hbCol.getMeasures()[bestIndex];
+ // criteria for holistic measure: Exact Aggregation && Exact Cuboid
+ if (measure.getFunction().isHolisticCountDistinct() && context.isExactAggregation()) {
+ logger.info("Holistic count distinct chosen for " + aggrFunc);
+ break;
+ }
+ }
+
+ RowValueDecoder codec = codecMap.get(bestHBCol);
+ if (codec == null) {
+ codec = new RowValueDecoder(bestHBCol);
+ codecMap.put(bestHBCol, codec);
+ }
+ codec.setIndex(bestIndex);
+ }
+ return new ArrayList<RowValueDecoder>(codecMap.values());
+ }
+
+ private TupleFilter flattenToOrAndFilter(TupleFilter filter) {
+ if (filter == null)
+ return null;
+
+ TupleFilter flatFilter = filter.flatFilter();
+
+ // normalize to OR-AND filter
+ if (flatFilter.getOperator() == FilterOperatorEnum.AND) {
+ LogicalTupleFilter f = new LogicalTupleFilter(FilterOperatorEnum.OR);
+ f.addChild(flatFilter);
+ flatFilter = f;
+ }
+
+ if (flatFilter.getOperator() != FilterOperatorEnum.OR)
+ throw new IllegalStateException();
+
+ return flatFilter;
+ }
+
+ private List<HBaseKeyRange> buildScanRanges(TupleFilter flatFilter, Collection<TblColRef> dimensionColumns) {
+
+ List<HBaseKeyRange> result = Lists.newArrayList();
+
+ logger.info("Current cubeInstance is " + cubeInstance + " with " + cubeInstance.getSegments().size() + " segs in all");
+ List<CubeSegment> segs = cubeInstance.getSegments(SegmentStatusEnum.READY);
+ logger.info("READY segs count: " + segs.size());
+
+ // build row key range for each cube segment
+ for (CubeSegment cubeSeg : segs) {
+
+ // consider derived (lookup snapshot), filter on dimension may
+ // differ per segment
+ List<Collection<ColumnValueRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter, cubeSeg);
+ if (orAndDimRanges == null) { // has conflict
+ continue;
+ }
+
+ List<HBaseKeyRange> scanRanges = Lists.newArrayListWithCapacity(orAndDimRanges.size());
+ for (Collection<ColumnValueRange> andDimRanges : orAndDimRanges) {
+ HBaseKeyRange rowKeyRange = new HBaseKeyRange(dimensionColumns, andDimRanges, cubeSeg, cubeDesc);
+ scanRanges.add(rowKeyRange);
+ }
+
+ List<HBaseKeyRange> mergedRanges = mergeOverlapRanges(scanRanges);
+ mergedRanges = mergeTooManyRanges(mergedRanges);
+ result.addAll(mergedRanges);
+ }
+
+ logger.info("hbasekeyrange count: " + result.size());
+ dropUnhitSegments(result);
+ logger.info("hbasekeyrange count after dropping unhit :" + result.size());
+
+ return result;
+ }
+
+ private List<Collection<ColumnValueRange>> translateToOrAndDimRanges(TupleFilter flatFilter, CubeSegment cubeSegment) {
+ List<Collection<ColumnValueRange>> result = Lists.newArrayList();
+
+ if (flatFilter == null) {
+ result.add(Collections.<ColumnValueRange> emptyList());
+ return result;
+ }
+
+ for (TupleFilter andFilter : flatFilter.getChildren()) {
+ if (andFilter.getOperator() != FilterOperatorEnum.AND) {
+ throw new IllegalStateException("Filter should be AND instead of " + andFilter);
+ }
+
+ Collection<ColumnValueRange> andRanges = translateToAndDimRanges(andFilter.getChildren(), cubeSegment);
+
+ result.add(andRanges);
+ }
+
+ return preprocessConstantConditions(result);
+ }
+
+ private List<Collection<ColumnValueRange>> preprocessConstantConditions(List<Collection<ColumnValueRange>> orAndRanges) {
+ boolean globalAlwaysTrue = false;
+ Iterator<Collection<ColumnValueRange>> iterator = orAndRanges.iterator();
+ while (iterator.hasNext()) {
+ Collection<ColumnValueRange> andRanges = iterator.next();
+ Iterator<ColumnValueRange> iterator2 = andRanges.iterator();
+ boolean hasAlwaysFalse = false;
+ while (iterator2.hasNext()) {
+ ColumnValueRange range = iterator2.next();
+ if (range.satisfyAll())
+ iterator2.remove();
+ else if (range.satisfyNone())
+ hasAlwaysFalse = true;
+ }
+ if (hasAlwaysFalse) {
+ iterator.remove();
+ } else if (andRanges.isEmpty()) {
+ globalAlwaysTrue = true;
+ break;
+ }
+ }
+ if (globalAlwaysTrue) {
+ orAndRanges.clear();
+ orAndRanges.add(Collections.<ColumnValueRange> emptyList());
+ }
+ return orAndRanges;
+ }
+
+ // return empty collection to mean true; return null to mean false
+ @SuppressWarnings("unchecked")
+ private Collection<ColumnValueRange> translateToAndDimRanges(List<? extends TupleFilter> andFilters, CubeSegment cubeSegment) {
+ Map<TblColRef, ColumnValueRange> rangeMap = new HashMap<TblColRef, ColumnValueRange>();
+ for (TupleFilter filter : andFilters) {
+ if ((filter instanceof CompareTupleFilter) == false) {
+ continue;
+ }
+
+ CompareTupleFilter comp = (CompareTupleFilter) filter;
+ if (comp.getColumn() == null) {
+ continue;
+ }
+
+ ColumnValueRange range = new ColumnValueRange(comp.getColumn(), (Collection<String>) comp.getValues(), comp.getOperator());
+ andMerge(range, rangeMap);
+ }
+
+ // a little pre-evaluation to remove invalid EQ/IN values and round start/end according to dictionary
+ Iterator<ColumnValueRange> it = rangeMap.values().iterator();
+ while (it.hasNext()) {
+ ColumnValueRange range = it.next();
+ range.preEvaluateWithDict((Dictionary<String>) cubeSegment.getDictionary(range.getColumn()));
+ if (range.satisfyAll())
+ it.remove();
+ else if (range.satisfyNone())
+ return null;
+ }
+
+ return rangeMap.values();
+ }
+
+ private void andMerge(ColumnValueRange range, Map<TblColRef, ColumnValueRange> rangeMap) {
+ ColumnValueRange columnRange = rangeMap.get(range.getColumn());
+ if (columnRange == null) {
+ rangeMap.put(range.getColumn(), range);
+ } else {
+ columnRange.andMerge(range);
+ }
+ }
+
+ private List<HBaseKeyRange> mergeOverlapRanges(List<HBaseKeyRange> keyRanges) {
+ if (keyRanges.size() <= 1) {
+ return keyRanges;
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Merging key range from " + keyRanges.size());
+ }
+
+ // sort ranges by start key
+ Collections.sort(keyRanges);
+
+ // merge the overlap range
+ List<HBaseKeyRange> mergedRanges = new LinkedList<HBaseKeyRange>();
+ int beginIndex = 0;
+ byte[] maxStopKey = keyRanges.get(0).getStopKey();
+ for (int index = 0; index < keyRanges.size(); index++) {
+ HBaseKeyRange keyRange = keyRanges.get(index);
+ if (Bytes.compareTo(maxStopKey, keyRange.getStartKey()) < 0) {
+ // merge the current key ranges
+ HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, beginIndex, index - 1);
+ mergedRanges.add(mergedRange);
+ // start new merge
+ beginIndex = index;
+ }
+ if (Bytes.compareTo(maxStopKey, keyRange.getStopKey()) < 0) {
+ // update the stop key
+ maxStopKey = keyRange.getStopKey();
+ }
+ }
+ // merge last range
+ HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, beginIndex, keyRanges.size() - 1);
+ mergedRanges.add(mergedRange);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Merging key range to " + mergedRanges.size());
+ }
+ return mergedRanges;
+ }
+
+ private HBaseKeyRange mergeKeyRange(List<HBaseKeyRange> keyRanges, int from, int to) {
+ HBaseKeyRange keyRange = keyRanges.get(from);
+ int mergeSize = to - from + 1;
+ if (mergeSize > 1) {
+ // merge range from mergeHeader to i - 1
+ CubeSegment cubeSegment = keyRange.getCubeSegment();
+ Cuboid cuboid = keyRange.getCuboid();
+ byte[] startKey = keyRange.getStartKey();
+ byte[] stopKey = keyRange.getStopKey();
+ long partitionColumnStartDate = Long.MAX_VALUE;
+ long partitionColumnEndDate = 0;
+ List<Pair<byte[], byte[]>> newFuzzyKeys = new ArrayList<Pair<byte[], byte[]>>(mergeSize);
+ List<Collection<ColumnValueRange>> newFlatOrAndFilter = Lists.newLinkedList();
+
+ boolean hasNonFuzzyRange = false;
+ for (int k = from; k <= to; k++) {
+ HBaseKeyRange nextRange = keyRanges.get(k);
+ hasNonFuzzyRange = hasNonFuzzyRange || nextRange.getFuzzyKeys().isEmpty();
+ newFuzzyKeys.addAll(nextRange.getFuzzyKeys());
+ newFlatOrAndFilter.addAll(nextRange.getFlatOrAndFilter());
+ if (Bytes.compareTo(stopKey, nextRange.getStopKey()) < 0) {
+ stopKey = nextRange.getStopKey();
+ }
+ if (nextRange.getPartitionColumnStartDate() > 0 && nextRange.getPartitionColumnStartDate() < partitionColumnStartDate) {
+ partitionColumnStartDate = nextRange.getPartitionColumnStartDate();
+ }
+ if (nextRange.getPartitionColumnEndDate() < Long.MAX_VALUE && nextRange.getPartitionColumnEndDate() > partitionColumnEndDate) {
+ partitionColumnEndDate = nextRange.getPartitionColumnEndDate();
+ }
+ }
+
+ // if any range is non-fuzzy, then all fuzzy keys must be cleared
+ if (hasNonFuzzyRange) {
+ newFuzzyKeys.clear();
+ }
+
+ partitionColumnStartDate = (partitionColumnStartDate == Long.MAX_VALUE) ? 0 : partitionColumnStartDate;
+ partitionColumnEndDate = (partitionColumnEndDate == 0) ? Long.MAX_VALUE : partitionColumnEndDate;
+ keyRange = new HBaseKeyRange(cubeSegment, cuboid, startKey, stopKey, newFuzzyKeys, newFlatOrAndFilter, partitionColumnStartDate, partitionColumnEndDate);
+ }
+ return keyRange;
+ }
+
+ private List<HBaseKeyRange> mergeTooManyRanges(List<HBaseKeyRange> keyRanges) {
+ if (keyRanges.size() < MERGE_KEYRANGE_THRESHOLD) {
+ return keyRanges;
+ }
+ // TODO: check the distance between range. and merge the large distance range
+ List<HBaseKeyRange> mergedRanges = new LinkedList<HBaseKeyRange>();
+ HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, 0, keyRanges.size() - 1);
+ mergedRanges.add(mergedRange);
+ return mergedRanges;
+ }
+
+ private void dropUnhitSegments(List<HBaseKeyRange> scans) {
+ if (cubeDesc.getModel().getPartitionDesc().isPartitioned()) {
+ Iterator<HBaseKeyRange> iterator = scans.iterator();
+ while (iterator.hasNext()) {
+ HBaseKeyRange scan = iterator.next();
+ if (scan.hitSegment() == false) {
+ iterator.remove();
+ }
+ }
+ }
+ }
+
+ private void setThreshold(Collection<TblColRef> dimensions, List<RowValueDecoder> valueDecoders, StorageContext context) {
+ if (RowValueDecoder.hasMemHungryCountDistinct(valueDecoders) == false) {
+ return;
+ }
+
+ int rowSizeEst = dimensions.size() * 3;
+ for (RowValueDecoder decoder : valueDecoders) {
+ MeasureDesc[] measures = decoder.getMeasures();
+ BitSet projectionIndex = decoder.getProjectionIndex();
+ for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) {
+ FunctionDesc func = measures[i].getFunction();
+ rowSizeEst += func.getReturnDataType().getSpaceEstimate();
+ }
+ }
+
+ long rowEst = MEM_BUDGET_PER_QUERY / rowSizeEst;
+ context.setThreshold((int) rowEst);
+ }
+
+ private void setLimit(TupleFilter filter, StorageContext context) {
+ boolean goodAggr = context.isExactAggregation();
+ boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
+ boolean goodSort = context.hasSort() == false;
+ if (goodAggr && goodFilter && goodSort) {
+ logger.info("Enable limit " + context.getLimit());
+ context.enableLimit();
+ }
+ }
+
+ private void setCoprocessor(Set<TblColRef> groupsCopD, List<RowValueDecoder> valueDecoders, StorageContext context) {
+ ObserverEnabler.enableCoprocessorIfBeneficial(cubeInstance, groupsCopD, valueDecoders, context);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java
deleted file mode 100644
index de85190..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase;
-
-import com.google.common.collect.Range;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.kylin.common.persistence.HBaseConnection;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.ICachableStorageEngine;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.hbase.coprocessor.endpoint.EndpointTupleIterator;
-import org.apache.kylin.storage.tuple.TupleInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexStorageEngine implements ICachableStorageEngine {
-
- private static Logger logger = LoggerFactory.getLogger(InvertedIndexStorageEngine.class);
-
- private IISegment seg;
- private String uuid;
- private EndpointTupleIterator dataIterator;
-
- public InvertedIndexStorageEngine(IIInstance ii) {
- this.seg = ii.getFirstSegment();
- this.uuid = ii.getUuid();
- }
-
- @Override
- public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
- String tableName = seg.getStorageLocationIdentifier();
-
- //HConnection is cached, so need not be closed
- @SuppressWarnings("deprecation")
- HConnection conn = HBaseConnection.get(context.getConnUrl());
- try {
- dataIterator = new EndpointTupleIterator(seg, sqlDigest.filter, sqlDigest.groupbyColumns, new ArrayList<>(sqlDigest.aggregations), context, conn, returnTupleInfo);
- return dataIterator;
- } catch (Throwable e) {
- logger.error("Error when connecting to II htable " + tableName, e);
- throw new IllegalStateException("Error when connecting to II htable " + tableName, e);
- }
- }
-
- @Override
- public Range<Long> getVolatilePeriod() {
- return dataIterator.getCacheExcludedPeriod();
- }
-
- @Override
- public String getStorageUUID() {
- return this.uuid;
- }
-
- @Override
- public boolean isDynamic() {
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageQuery.java b/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageQuery.java
new file mode 100644
index 0000000..762258d
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageQuery.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import com.google.common.collect.Range;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.storage.ICachableStorageQuery;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.hbase.coprocessor.endpoint.EndpointTupleIterator;
+import org.apache.kylin.storage.tuple.TupleInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexStorageQuery implements ICachableStorageQuery {
+
+ private static Logger logger = LoggerFactory.getLogger(InvertedIndexStorageQuery.class);
+
+ private IISegment seg;
+ private String uuid;
+ private EndpointTupleIterator dataIterator;
+
+ public InvertedIndexStorageQuery(IIInstance ii) {
+ this.seg = ii.getFirstSegment();
+ this.uuid = ii.getUuid();
+ }
+
+ @Override
+ public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+ String tableName = seg.getStorageLocationIdentifier();
+
+ //HConnection is cached, so need not be closed
+ HConnection conn = HBaseConnection.get(context.getConnUrl());
+ try {
+ dataIterator = new EndpointTupleIterator(seg, sqlDigest.filter, sqlDigest.groupbyColumns, new ArrayList<>(sqlDigest.aggregations), context, conn, returnTupleInfo);
+ return dataIterator;
+ } catch (Throwable e) {
+ logger.error("Error when connecting to II htable " + tableName, e);
+ throw new IllegalStateException("Error when connecting to II htable " + tableName, e);
+ }
+ }
+
+ @Override
+ public Range<Long> getVolatilePeriod() {
+ return dataIterator.getCacheExcludedPeriod();
+ }
+
+ @Override
+ public String getStorageUUID() {
+ return this.uuid;
+ }
+
+ @Override
+ public boolean isDynamic() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
index 3243698..059f75b 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
@@ -7,7 +7,7 @@ import org.apache.kylin.metadata.tuple.CompoundTupleIterator;
import org.apache.kylin.metadata.tuple.ITupleIterator;
import org.apache.kylin.storage.IStorageQuery;
import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.StorageEngineFactory;
+import org.apache.kylin.storage.StorageFactory;
import org.apache.kylin.storage.tuple.TupleInfo;
import java.util.List;
@@ -23,7 +23,7 @@ public class HybridStorageEngine implements IStorageQuery {
this.realizations = hybridInstance.getRealizations();
storageEngines = new IStorageQuery[realizations.length];
for (int i = 0; i < realizations.length; i++) {
- storageEngines[i] = StorageEngineFactory.getStorageEngine(realizations[i]);
+ storageEngines[i] = StorageFactory.createQuery(realizations[i]);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/test/java/org/apache/kylin/storage/test/DynamicCacheTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/test/DynamicCacheTest.java b/storage/src/test/java/org/apache/kylin/storage/test/DynamicCacheTest.java
index 3814902..46b5588 100644
--- a/storage/src/test/java/org/apache/kylin/storage/test/DynamicCacheTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/test/DynamicCacheTest.java
@@ -12,7 +12,7 @@ import org.apache.kylin.metadata.realization.SQLDigest;
import org.apache.kylin.metadata.tuple.ITuple;
import org.apache.kylin.metadata.tuple.ITupleIterator;
import org.apache.kylin.metadata.tuple.SimpleTupleIterator;
-import org.apache.kylin.storage.ICachableStorageEngine;
+import org.apache.kylin.storage.ICachableStorageQuery;
import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.cache.CacheFledgedDynamicStorageEngine;
import org.apache.kylin.storage.hbase.coprocessor.endpoint.TsConditionExtractor;
@@ -87,7 +87,7 @@ public class DynamicCacheTest {
final AtomicInteger underlyingSEHitCount = new AtomicInteger(0);
final List<Integer> returnedRowPerSearch = Lists.newArrayList();
- CacheFledgedDynamicStorageEngine dynamicCache = new CacheFledgedDynamicStorageEngine(new ICachableStorageEngine() {
+ CacheFledgedDynamicStorageEngine dynamicCache = new CacheFledgedDynamicStorageEngine(new ICachableStorageQuery() {
@Override
public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
Range<Long> tsRagneInQuery = TsConditionExtractor.extractTsCondition(partitionCol, sqlDigest.filter);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java b/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java
index 2d75535..4408b55 100644
--- a/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/test/ITStorageTest.java
@@ -30,7 +30,7 @@ import org.apache.kylin.metadata.tuple.ITuple;
import org.apache.kylin.metadata.tuple.ITupleIterator;
import org.apache.kylin.storage.IStorageQuery;
import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.StorageEngineFactory;
+import org.apache.kylin.storage.StorageFactory;
import org.apache.kylin.storage.hbase.ScanOutOfLimitException;
import org.junit.*;
@@ -60,7 +60,7 @@ public class ITStorageTest extends HBaseMetadataTestCase {
CubeManager cubeMgr = CubeManager.getInstance(getTestConfig());
cube = cubeMgr.getCube("TEST_KYLIN_CUBE_WITHOUT_SLR_EMPTY");
Assert.assertNotNull(cube);
- storageEngine = StorageEngineFactory.getStorageEngine(cube);
+ storageEngine = StorageFactory.createQuery(cube);
String url = KylinConfig.getInstanceFromEnv().getStorageUrl();
context = new StorageContext();
context.setConnUrl(url);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/test/java/org/apache/kylin/storage/test/StaticCacheTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/test/StaticCacheTest.java b/storage/src/test/java/org/apache/kylin/storage/test/StaticCacheTest.java
index 112f612..6889668 100644
--- a/storage/src/test/java/org/apache/kylin/storage/test/StaticCacheTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/test/StaticCacheTest.java
@@ -10,7 +10,7 @@ import org.apache.kylin.metadata.realization.SQLDigest;
import org.apache.kylin.metadata.tuple.ITuple;
import org.apache.kylin.metadata.tuple.ITupleIterator;
import org.apache.kylin.metadata.tuple.SimpleTupleIterator;
-import org.apache.kylin.storage.ICachableStorageEngine;
+import org.apache.kylin.storage.ICachableStorageQuery;
import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.cache.CacheFledgedStaticStorageEngine;
import org.apache.kylin.storage.tuple.Tuple;
@@ -45,7 +45,7 @@ public class StaticCacheTest {
final AtomicInteger underlyingSEHitCount = new AtomicInteger(0);
- CacheFledgedStaticStorageEngine cacheFledgedStaticStorageEngine = new CacheFledgedStaticStorageEngine(new ICachableStorageEngine() {
+ CacheFledgedStaticStorageEngine cacheFledgedStaticStorageEngine = new CacheFledgedStaticStorageEngine(new ICachableStorageQuery() {
@Override
public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
underlyingSEHitCount.incrementAndGet();
[4/5] incubator-kylin git commit: KYLIN-878 HBase storage abstraction
for cubing flow
Posted by li...@apache.org.
KYLIN-878 HBase storage abstraction for cubing flow
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/4d3af3a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/4d3af3a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/4d3af3a9
Branch: refs/heads/KYLIN-878
Commit: 4d3af3a91686eb638e0b5868986489626a83dffa
Parents: 2e89ea5
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Jul 17 10:27:43 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Jul 17 10:29:08 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/engine/BuildEngineFactory.java | 20 +-
.../apache/kylin/engine/IBatchCubingEngine.java | 3 +
.../kylin/engine/mr/BatchCubingJobBuilder.java | 137 +---
.../kylin/engine/mr/BatchMergeJobBuilder.java | 98 +--
.../kylin/engine/mr/GarbageCollectionStep.java | 149 -----
.../org/apache/kylin/engine/mr/IMRInput.java | 4 +-
.../kylin/engine/mr/IMRJobFlowParticipant.java | 34 -
.../org/apache/kylin/engine/mr/IMROutput.java | 46 +-
.../kylin/engine/mr/JobBuilderSupport.java | 152 ++---
.../kylin/engine/mr/MRBatchCubingEngine.java | 29 +-
.../java/org/apache/kylin/engine/mr/MRUtil.java | 45 ++
.../engine/mr2/BatchCubingJobBuilder2.java | 102 +++
.../kylin/engine/mr2/BatchMergeJobBuilder2.java | 107 +++
.../org/apache/kylin/engine/mr2/IMROutput2.java | 5 +
.../kylin/engine/mr2/MRBatchCubingEngine2.java | 48 ++
.../kylin/job/engine/JobEngineConfig.java | 4 -
.../cardinality/ColumnCardinalityMapper.java | 4 +-
.../cardinality/HiveColumnCardinalityJob.java | 4 +-
.../job/hadoop/cube/FactDistinctColumnsJob.java | 4 +-
.../cube/FactDistinctColumnsMapperBase.java | 4 +-
.../kylin/job/hadoop/cubev2/InMemCuboidJob.java | 4 +-
.../job/hadoop/cubev2/InMemCuboidMapper.java | 4 +-
.../apache/kylin/source/hive/HiveMRInput.java | 2 +-
.../java/org/apache/kylin/storage/IStorage.java | 28 -
.../apache/kylin/storage/StorageFactory2.java | 34 +
.../kylin/storage/hbase/HBaseMROutput.java | 38 +-
.../kylin/storage/hbase/HBaseMRSteps.java | 135 ++++
.../apache/kylin/storage/hbase/MergeGCStep.java | 121 ++++
.../kylin/query/enumerator/OLAPEnumerator.java | 4 +-
.../kylin/storage/ICachableStorageEngine.java | 34 -
.../kylin/storage/ICachableStorageQuery.java | 33 +
.../java/org/apache/kylin/storage/IStorage.java | 28 +
.../kylin/storage/StorageEngineFactory.java | 83 ---
.../apache/kylin/storage/StorageFactory.java | 85 +++
.../AbstractCacheFledgedStorageEngine.java | 6 +-
.../cache/CacheFledgedDynamicStorageEngine.java | 4 +-
.../cache/CacheFledgedStaticStorageEngine.java | 4 +-
.../kylin/storage/cube/CubeStorageEngine.java | 371 -----------
.../kylin/storage/cube/CubeStorageQuery.java | 371 +++++++++++
.../kylin/storage/hbase/CubeStorageEngine.java | 663 -------------------
.../kylin/storage/hbase/CubeStorageQuery.java | 663 +++++++++++++++++++
.../hbase/InvertedIndexStorageEngine.java | 83 ---
.../hbase/InvertedIndexStorageQuery.java | 82 +++
.../storage/hybrid/HybridStorageEngine.java | 4 +-
.../kylin/storage/test/DynamicCacheTest.java | 4 +-
.../kylin/storage/test/ITStorageTest.java | 4 +-
.../kylin/storage/test/StaticCacheTest.java | 4 +-
47 files changed, 2094 insertions(+), 1801 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
index c536deb..721b85a 100644
--- a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
+++ b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
@@ -18,22 +18,36 @@
package org.apache.kylin.engine;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.MRBatchCubingEngine;
+import org.apache.kylin.engine.mr2.MRBatchCubingEngine2;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
public class BuildEngineFactory {
- private static final IBatchCubingEngine defaultBatch = new MRBatchCubingEngine();
+ private static IBatchCubingEngine defaultBatchEngine;
+
+ public static IBatchCubingEngine defaultBatchEngine() {
+ if (defaultBatchEngine == null) {
+ KylinConfig conf = KylinConfig.getInstanceFromEnv();
+ if (conf.isCubingInMem()) {
+ defaultBatchEngine = new MRBatchCubingEngine2();
+ } else {
+ defaultBatchEngine = new MRBatchCubingEngine();
+ }
+ }
+ return defaultBatchEngine;
+ }
/** Build a new cube segment, typically its time range appends to the end of current cube. */
public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
- return defaultBatch.createBatchCubingJob(newSegment, submitter);
+ return defaultBatchEngine().createBatchCubingJob(newSegment, submitter);
}
/** Merge multiple small segments into a big one. */
public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
- return defaultBatch.createBatchMergeJob(mergeSegment, submitter);
+ return defaultBatchEngine().createBatchMergeJob(mergeSegment, submitter);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java b/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
index 55a080c..904f557 100644
--- a/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
+++ b/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
@@ -29,4 +29,7 @@ public interface IBatchCubingEngine {
/** Merge multiple small segments into a big one. */
public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter);
+ public Class<?> getSourceInterface();
+
+ public Class<?> getStorageInterface();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index 239ce64..0ff3bef 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -21,108 +21,57 @@ package org.apache.kylin.engine.mr;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
-import org.apache.kylin.job.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
import org.apache.kylin.job.common.MapReduceExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.hadoop.cube.BaseCuboidJob;
-import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsJob;
import org.apache.kylin.job.hadoop.cube.NDCuboidJob;
-import org.apache.kylin.job.hadoop.cubev2.InMemCuboidJob;
-import org.apache.kylin.job.hadoop.cubev2.SaveStatisticsStep;
-import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
public class BatchCubingJobBuilder extends JobBuilderSupport {
private final IMRBatchCubingInputSide inputSide;
-
+ private final IMRBatchCubingOutputSide outputSide;
+
public BatchCubingJobBuilder(CubeSegment newSegment, String submitter) {
super(newSegment, submitter);
- this.inputSide = MRBatchCubingEngine.getBatchCubingInputSide(seg);
+ this.inputSide = MRUtil.getBatchCubingInputSide(seg);
+ this.outputSide = MRUtil.getBatchCubingOutputSide(seg);
}
public CubingJob build() {
- final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+ final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
final String jobId = result.getId();
final String cuboidRootPath = getCuboidRootPath(jobId);
final CubeJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
// Phase 1: Create Flat Table
inputSide.addStepPhase1_CreateFlatTable(result);
-
+
// Phase 2: Build Dictionary
result.addTask(createFactDistinctColumnsStep(flatHiveTableDesc, jobId));
result.addTask(createBuildDictionaryStep(jobId));
-
+
// Phase 3: Build Cube
- if (config.isInMemCubing()) {
- result.addTask(createSaveStatisticsStep(jobId));
-
- // create htable step
- result.addTask(createCreateHTableStep(jobId));
- result.addTask(createInMemCubingStep(flatHiveTableDesc, result.getId()));
- // bulk load step
- result.addTask(createBulkLoadStep(jobId));
- } else {
- final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels();
- final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
- final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
-
- // base cuboid step
- result.addTask(createBaseCuboidStep(flatHiveTableDesc, cuboidOutputTempPath, jobId));
-
- // n dim cuboid steps
- for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
- int dimNum = totalRowkeyColumnsCount - i;
- result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount));
- }
- result.addTask(createRangeRowkeyDistributionStep(cuboidRootPath + "*", jobId));
- // create htable step
- result.addTask(createCreateHTableStep(jobId));
- // generate hfiles step
- result.addTask(createConvertCuboidToHfileStep(cuboidRootPath + "*", jobId));
- // bulk load step
- result.addTask(createBulkLoadStep(jobId));
+ final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels();
+ final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
+ final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
+ // base cuboid step
+ result.addTask(createBaseCuboidStep(flatHiveTableDesc, cuboidOutputTempPath, jobId));
+ // n dim cuboid steps
+ for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
+ int dimNum = totalRowkeyColumnsCount - i;
+ result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount));
}
+ outputSide.addStepPhase3_BuildCube(result, cuboidRootPath);
// Phase 4: Update Metadata & Cleanup
result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
- inputSide.addStepPhase4_UpdateMetadataAndCleanup(result);
-
- return result;
- }
-
- private MapReduceExecutable createFactDistinctColumnsStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
- MapReduceExecutable result = new MapReduceExecutable();
- result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
- result.setMapReduceJobClass(FactDistinctColumnsJob.class);
- StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, seg);
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(jobId));
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(config.isInMemCubing()));
- appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
- appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
- appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step");
+ inputSide.addStepPhase4_Cleanup(result);
+ outputSide.addStepPhase4_Cleanup(result);
- result.setMapReduceParams(cmd.toString());
return result;
}
- private HadoopShellExecutable createBuildDictionaryStep(String jobId) {
- // base cuboid job
- HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
- buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
- StringBuilder cmd = new StringBuilder();
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", getFactDistinctColumnsPath(jobId));
-
- buildDictionaryStep.setJobParams(cmd.toString());
- buildDictionaryStep.setJobClass(CreateDictionaryJob.class);
- return buildDictionaryStep;
- }
-
private MapReduceExecutable createBaseCuboidStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String[] cuboidOutputTempPath, String jobId) {
// base cuboid job
MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
@@ -165,52 +114,6 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
return ndCuboidStep;
}
- private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
- SaveStatisticsStep result = new SaveStatisticsStep();
- result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
- result.setCubeName(seg.getCubeInstance().getName());
- result.setSegmentId(seg.getUuid());
- result.setStatisticsPath(getStatisticsPath(jobId));
- return result;
- }
-
- private MapReduceExecutable createInMemCubingStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
- // base cuboid job
- MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
-
- StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, seg);
-
- baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
-
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", getFlatHiveTableLocation(flatHiveTableDesc, jobId));
- appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
- appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
- appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "level", "0");
- appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-
- baseCuboidStep.setMapReduceParams(cmd.toString());
- baseCuboidStep.setMapReduceJobClass(InMemCuboidJob.class);
- baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
- return baseCuboidStep;
- }
-
- private UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId) {
- final UpdateCubeInfoAfterBuildStep updateCubeInfoStep = new UpdateCubeInfoAfterBuildStep();
- updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
- updateCubeInfoStep.setCubeName(seg.getCubeInstance().getName());
- updateCubeInfoStep.setSegmentId(seg.getUuid());
- updateCubeInfoStep.setCubingJobId(jobId);
- return updateCubeInfoStep;
- }
-
- private String getFlatHiveTableLocation(CubeJoinedFlatTableDesc flatTableDesc, String jobId) {
- return getJobWorkingDir(jobId) + "/" + flatTableDesc.getTableName();
- }
-
private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
String[] paths = new String[groupRowkeyColumnsCount + 1];
for (int i = 0; i <= groupRowkeyColumnsCount; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
index cb2d8dd..6264ebd 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
@@ -22,23 +22,25 @@ import java.util.List;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
import org.apache.kylin.job.common.MapReduceExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.hadoop.cube.MergeCuboidJob;
-import org.apache.kylin.job.hadoop.cubev2.MergeCuboidFromHBaseJob;
-import org.apache.kylin.job.hadoop.cubev2.MergeStatisticsStep;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
public class BatchMergeJobBuilder extends JobBuilderSupport {
+ private final IMRBatchMergeOutputSide outputSide;
+
public BatchMergeJobBuilder(CubeSegment mergeSegment, String submitter) {
super(mergeSegment, submitter);
+ this.outputSide = MRUtil.getBatchMergeOutputSide(seg);
}
public CubingJob build() {
- final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
+ final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
final String jobId = result.getId();
final String cuboidRootPath = getCuboidRootPath(jobId);
@@ -46,66 +48,23 @@ public class BatchMergeJobBuilder extends JobBuilderSupport {
Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
final List<String> mergingSegmentIds = Lists.newArrayList();
final List<String> mergingCuboidPaths = Lists.newArrayList();
- final List<String> mergingHTables = Lists.newArrayList();
for (CubeSegment merging : mergingSegments) {
mergingSegmentIds.add(merging.getUuid());
mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
- mergingHTables.add(merging.getStorageLocationIdentifier());
}
+ // Phase 1: Merge Dictionary
result.addTask(createMergeDictionaryStep(mergingSegmentIds));
-
- if (config.isInMemCubing()) {
-
- String mergedStatisticsFolder = getStatisticsPath(jobId);
- result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, mergedStatisticsFolder));
-
- // create htable step
- result.addTask(createCreateHTableStep(jobId));
-
- String formattedTables = StringUtil.join(mergingHTables, ",");
- result.addTask(createMergeCuboidDataFromHBaseStep(formattedTables, jobId));
-
- } else {
- // merge cuboid
- String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
- result.addTask(createMergeCuboidDataStep(seg, formattedPath, cuboidRootPath));
-
- // convert htable
- result.addTask(createRangeRowkeyDistributionStep(cuboidRootPath + "*", jobId));
- // create htable step
- result.addTask(createCreateHTableStep(jobId));
- // generate hfiles step
- result.addTask(createConvertCuboidToHfileStep(cuboidRootPath + "*", jobId));
- }
-
- // bulk load step
- result.addTask(createBulkLoadStep(jobId));
-
- // update cube info
- result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
- result.addTask(createGarbageCollectionStep(mergingHTables, null));
+ // Phase 2: Merge Cube Files
+ String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
+ result.addTask(createMergeCuboidDataStep(seg, formattedPath, cuboidRootPath));
+ outputSide.addStepPhase2_BuildCube(result, cuboidRootPath);
- return result;
- }
-
- private MergeDictionaryStep createMergeDictionaryStep(List<String> mergingSegmentIds) {
- MergeDictionaryStep result = new MergeDictionaryStep();
- result.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
- result.setCubeName(seg.getCubeInstance().getName());
- result.setSegmentId(seg.getUuid());
- result.setMergingSegmentIds(mergingSegmentIds);
- return result;
- }
+ // Phase 3: Update Metadata & Cleanup
+ result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
+ outputSide.addStepPhase3_Cleanup(result);
- private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
- MergeStatisticsStep result = new MergeStatisticsStep();
- result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
- result.setCubeName(seg.getCubeInstance().getName());
- result.setSegmentId(seg.getUuid());
- result.setMergingSegmentIds(mergingSegmentIds);
- result.setMergedStatisticsPath(mergedStatisticsFolder);
return result;
}
@@ -126,35 +85,4 @@ public class BatchMergeJobBuilder extends JobBuilderSupport {
return mergeCuboidDataStep;
}
-
- private MapReduceExecutable createMergeCuboidDataFromHBaseStep(String inputTableNames, String jobId) {
- MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
- mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
- StringBuilder cmd = new StringBuilder();
-
- appendMapReduceParameters(cmd, seg);
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", inputTableNames);
- appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
- appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
- appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
-
- mergeCuboidDataStep.setMapReduceParams(cmd.toString());
- mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromHBaseJob.class);
- mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
- return mergeCuboidDataStep;
- }
-
- private UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(List<String> mergingSegmentIds, String jobId) {
- UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep();
- result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
- result.setCubeName(seg.getCubeInstance().getName());
- result.setSegmentId(seg.getUuid());
- result.setMergingSegmentIds(mergingSegmentIds);
- result.setCubingJobId(jobId);
- return result;
- }
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java
deleted file mode 100644
index d79f35d..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.job.cmd.ShellCmdOutput;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- * Drop the resources that is no longer needed, including intermediate hive table (after cube build) and hbase tables (after cube merge)
- */
-@Deprecated // only exists for backward compatibility
-public class GarbageCollectionStep extends AbstractExecutable {
-
- private static final String OLD_HTABLES = "oldHTables";
-
- private static final String OLD_HIVE_TABLE = "oldHiveTable";
-
- private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
-
- public GarbageCollectionStep() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-
- StringBuffer output = new StringBuffer();
-
- final String hiveTable = this.getOldHiveTable();
- if (StringUtils.isNotEmpty(hiveTable)) {
- final String dropHiveCMD = "hive -e \"DROP TABLE IF EXISTS " + hiveTable + ";\"";
- ShellCmdOutput shellCmdOutput = new ShellCmdOutput();
- try {
- context.getConfig().getCliCommandExecutor().execute(dropHiveCMD, shellCmdOutput);
- output.append("Hive table " + hiveTable + " is dropped. \n");
- } catch (IOException e) {
- logger.error("job:" + getId() + " execute finished with exception", e);
- output.append(shellCmdOutput.getOutput()).append("\n").append(e.getLocalizedMessage());
- return new ExecuteResult(ExecuteResult.State.ERROR, output.toString());
- }
- }
-
-
- List<String> oldTables = getOldHTables();
- if (oldTables != null && oldTables.size() > 0) {
- String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
- Configuration conf = HBaseConfiguration.create();
- HBaseAdmin admin = null;
- try {
- admin = new HBaseAdmin(conf);
- for (String table : oldTables) {
- if (admin.tableExists(table)) {
- HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table));
- String host = tableDescriptor.getValue(IRealizationConstants.HTableTag);
- if (metadataUrlPrefix.equalsIgnoreCase(host)) {
- if (admin.isTableEnabled(table)) {
- admin.disableTable(table);
- }
- admin.deleteTable(table);
- logger.debug("Dropped htable: " + table);
- output.append("HBase table " + table + " is dropped. \n");
- } else {
- logger.debug("Skip htable: " + table);
- output.append("Skip htable: " + table + ". \n");
- }
- }
- }
-
- } catch (IOException e) {
- output.append("Got error when drop HBase table, exiting... \n");
- // This should not block the merge job; Orphans should be cleaned up in StorageCleanupJob
- return new ExecuteResult(ExecuteResult.State.ERROR, output.append(e.getLocalizedMessage()).toString());
- } finally {
- if (admin != null)
- try {
- admin.close();
- } catch (IOException e) {
- logger.error(e.getLocalizedMessage());
- }
- }
- }
-
-
- return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
- }
-
- public void setOldHTables(List<String> ids) {
- setParam(OLD_HTABLES, StringUtils.join(ids, ","));
- }
-
- private List<String> getOldHTables() {
- final String ids = getParam(OLD_HTABLES);
- if (ids != null) {
- final String[] splitted = StringUtils.split(ids, ",");
- ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
- for (String id : splitted) {
- result.add(id);
- }
- return result;
- } else {
- return Collections.emptyList();
- }
- }
-
- public void setOldHiveTable(String hiveTable) {
- setParam(OLD_HIVE_TABLE, hiveTable);
- }
-
- private String getOldHiveTable() {
- return getParam(OLD_HIVE_TABLE);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index 08ed94a..fb4767e 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -48,7 +48,7 @@ public interface IMRInput {
/**
* Participate the batch cubing flow as the input side. Responsible for creating
- * intermediate flat table (Phase 1) and clean up if necessary (Phase 4).
+ * intermediate flat table (Phase 1) and clean up any leftover (Phase 4).
*
* - Phase 1: Create Flat Table
* - Phase 2: Build Dictionary
@@ -61,7 +61,7 @@ public interface IMRInput {
public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
/** Add step that does necessary clean up, like delete the intermediate flat table */
- public void addStepPhase4_UpdateMetadataAndCleanup(DefaultChainedExecutable jobFlow);
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
/** Return an InputFormat that reads from the intermediate flat table */
public IMRTableInputFormat getFlatTableInputFormat();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java b/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java
deleted file mode 100644
index 6a94920..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import java.util.List;
-
-import org.apache.kylin.job.execution.AbstractExecutable;
-
-public interface IMRJobFlowParticipant {
-
- public List<? extends AbstractExecutable> contributePhase1CreateFlatTable(List<? extends AbstractExecutable> steps);
-
- public List<? extends AbstractExecutable> contributePhase2CreateDictionary(List<? extends AbstractExecutable> steps);
-
- public List<? extends AbstractExecutable> contributePhase3BuildCube(List<? extends AbstractExecutable> steps);
-
- public List<? extends AbstractExecutable> contributePhase4UpdateCubeMetadata(List<? extends AbstractExecutable> steps);
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
index 6e3a42c..8896a2e 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
@@ -18,9 +18,49 @@
package org.apache.kylin.engine.mr;
-public interface IMROutput {
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
- public IMRJobFlowParticipant createBuildFlowParticipant();
+public interface IMROutput {
- public IMRJobFlowParticipant createMergeFlowParticipant();
+ /** Return a helper to participate in batch cubing job flow. */
+ public IMRBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg);
+
+ /**
+ * Participate the batch cubing flow as the output side. Responsible for saving
+ * the cuboid output to storage (Phase 3).
+ *
+ * - Phase 1: Create Flat Table
+ * - Phase 2: Build Dictionary
+ * - Phase 3: Build Cube
+ * - Phase 4: Update Metadata & Cleanup
+ */
+ public interface IMRBatchCubingOutputSide {
+
+ /** Add step that saves cuboid output from HDFS to storage. */
+ public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
+
+ /** Add step that does any necessary clean up. */
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+ }
+
+ /** Return a helper to participate in batch merge job flow. */
+ public IMRBatchMergeOutputSide getBatchMergeOutputSide(CubeSegment seg);
+
+ /**
+ * Participate the batch cubing flow as the output side. Responsible for saving
+ * the cuboid output to storage (Phase 2).
+ *
+ * - Phase 1: Merge Dictionary
+ * - Phase 2: Merge Cube
+ * - Phase 3: Update Metadata & Cleanup
+ */
+ public interface IMRBatchMergeOutputSide {
+
+ /** Add step that saves cuboid output from HDFS to storage. */
+ public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
+
+ /** Add step that does any necessary clean up. */
+ public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 79430f6..4652269 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -23,120 +23,115 @@ import java.util.List;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.job.common.HadoopShellExecutable;
import org.apache.kylin.job.common.MapReduceExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.hadoop.cube.CubeHFileJob;
-import org.apache.kylin.job.hadoop.cube.RangeKeyDistributionJob;
-import org.apache.kylin.job.hadoop.hbase.BulkLoadJob;
-import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
+import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsJob;
+import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
import com.google.common.base.Preconditions;
/**
- * Hold reusable methods for real builders.
+ * Hold reusable steps for builders.
*/
-abstract public class JobBuilderSupport {
+public class JobBuilderSupport {
- protected final JobEngineConfig config;
- protected final CubeSegment seg;
- protected final String submitter;
+ final protected JobEngineConfig config;
+ final protected CubeSegment seg;
+ final protected String submitter;
- protected JobBuilderSupport(CubeSegment seg, String submitter) {
+ public JobBuilderSupport(CubeSegment seg, String submitter) {
Preconditions.checkNotNull(seg, "segment cannot be null");
this.config = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
this.seg = seg;
this.submitter = submitter;
}
- protected MapReduceExecutable createRangeRowkeyDistributionStep(String inputPath, String jobId) {
- MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable();
- rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION);
+ public MapReduceExecutable createFactDistinctColumnsStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
+ return createFactDistinctColumnsStep(flatHiveTableDesc, jobId, false);
+ }
+
+ public MapReduceExecutable createFactDistinctColumnsStepWithStats(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
+ return createFactDistinctColumnsStep(flatHiveTableDesc, jobId, true);
+ }
+
+ private MapReduceExecutable createFactDistinctColumnsStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId, boolean withStats) {
+ MapReduceExecutable result = new MapReduceExecutable();
+ result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
+ result.setMapReduceJobClass(FactDistinctColumnsJob.class);
StringBuilder cmd = new StringBuilder();
-
appendMapReduceParameters(cmd, seg);
- appendExecCmdParameters(cmd, "input", inputPath);
- appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath(jobId));
appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "jobname", "Kylin_Region_Splits_Calculator_" + seg.getCubeInstance().getName() + "_Step");
+ appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(jobId));
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(withStats));
+ appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
+ appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step");
- rowkeyDistributionStep.setMapReduceParams(cmd.toString());
- rowkeyDistributionStep.setMapReduceJobClass(RangeKeyDistributionJob.class);
- return rowkeyDistributionStep;
+ result.setMapReduceParams(cmd.toString());
+ return result;
}
- protected HadoopShellExecutable createCreateHTableStep(String jobId) {
- HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
- createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
+ public HadoopShellExecutable createBuildDictionaryStep(String jobId) {
+ // base cuboid job
+ HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
+ buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
StringBuilder cmd = new StringBuilder();
appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
- appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
- appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(config.isInMemCubing()));
-
- createHtableStep.setJobParams(cmd.toString());
- createHtableStep.setJobClass(CreateHTableJob.class);
+ appendExecCmdParameters(cmd, "input", getFactDistinctColumnsPath(jobId));
- return createHtableStep;
+ buildDictionaryStep.setJobParams(cmd.toString());
+ buildDictionaryStep.setJobClass(CreateDictionaryJob.class);
+ return buildDictionaryStep;
}
- protected MapReduceExecutable createConvertCuboidToHfileStep(String inputPath, String jobId) {
- MapReduceExecutable createHFilesStep = new MapReduceExecutable();
- createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
- StringBuilder cmd = new StringBuilder();
-
- appendMapReduceParameters(cmd, seg);
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "input", inputPath);
- appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
- appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
- appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getCubeInstance().getName() + "_Step");
-
- createHFilesStep.setMapReduceParams(cmd.toString());
- createHFilesStep.setMapReduceJobClass(CubeHFileJob.class);
- createHFilesStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
-
- return createHFilesStep;
+ public UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId) {
+ final UpdateCubeInfoAfterBuildStep updateCubeInfoStep = new UpdateCubeInfoAfterBuildStep();
+ updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
+ updateCubeInfoStep.setCubeName(seg.getCubeInstance().getName());
+ updateCubeInfoStep.setSegmentId(seg.getUuid());
+ updateCubeInfoStep.setCubingJobId(jobId);
+ return updateCubeInfoStep;
}
- protected HadoopShellExecutable createBulkLoadStep(String jobId) {
- HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
- bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
-
- StringBuilder cmd = new StringBuilder();
- appendExecCmdParameters(cmd, "input", getHFilePath(jobId));
- appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-
- bulkLoadStep.setJobParams(cmd.toString());
- bulkLoadStep.setJobClass(BulkLoadJob.class);
-
- return bulkLoadStep;
+ public MergeDictionaryStep createMergeDictionaryStep(List<String> mergingSegmentIds) {
+ MergeDictionaryStep result = new MergeDictionaryStep();
+ result.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
+ result.setCubeName(seg.getCubeInstance().getName());
+ result.setSegmentId(seg.getUuid());
+ result.setMergingSegmentIds(mergingSegmentIds);
+ return result;
}
-
- protected GarbageCollectionStep createGarbageCollectionStep(List<String> oldHtables, String interimTable) {
- GarbageCollectionStep result = new GarbageCollectionStep();
- result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
- result.setOldHTables(oldHtables);
- result.setOldHiveTable(interimTable);
+
+ public UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(List<String> mergingSegmentIds, String jobId) {
+ UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep();
+ result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
+ result.setCubeName(seg.getCubeInstance().getName());
+ result.setSegmentId(seg.getUuid());
+ result.setMergingSegmentIds(mergingSegmentIds);
+ result.setCubingJobId(jobId);
return result;
}
- protected String getJobWorkingDir(String jobId) {
+ // ============================================================================
+
+ public String getJobWorkingDir(String jobId) {
return getJobWorkingDir(config, jobId);
}
- protected String getCuboidRootPath(String jobId) {
+ public String getCuboidRootPath(String jobId) {
return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/";
}
- protected String getCuboidRootPath(CubeSegment seg) {
+ public String getCuboidRootPath(CubeSegment seg) {
return getCuboidRootPath(seg.getLastBuildJobID());
}
- protected void appendMapReduceParameters(StringBuilder buf, CubeSegment seg) {
+ public void appendMapReduceParameters(StringBuilder buf, CubeSegment seg) {
try {
String jobConf = config.getHadoopJobConfFilePath(seg.getCubeDesc().getModel().getCapacity());
if (jobConf != null && jobConf.length() > 0) {
@@ -146,25 +141,20 @@ abstract public class JobBuilderSupport {
throw new RuntimeException(e);
}
}
+
+ public String getFlatHiveTableLocation(CubeJoinedFlatTableDesc flatTableDesc, String jobId) {
+ return getJobWorkingDir(jobId) + "/" + flatTableDesc.getTableName();
+ }
- protected String getFactDistinctColumnsPath(String jobId) {
+ public String getFactDistinctColumnsPath(String jobId) {
return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns";
}
- protected String getStatisticsPath(String jobId) {
+ public String getStatisticsPath(String jobId) {
return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/statistics";
}
-
- protected String getHFilePath(String jobId) {
- return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/";
- }
-
- protected String getRowkeyDistributionOutputPath(String jobId) {
- return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats";
- }
-
// ============================================================================
// static methods also shared by other job flow participant
// ----------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
index b374a99..61328c9 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
@@ -18,15 +18,9 @@
package org.apache.kylin.engine.mr;
-import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.IBatchCubingEngine;
-import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
-import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.TableSourceFactory;
public class MRBatchCubingEngine implements IBatchCubingEngine {
@@ -40,25 +34,14 @@ public class MRBatchCubingEngine implements IBatchCubingEngine {
return new BatchMergeJobBuilder(mergeSegment, submitter).build();
}
- public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
- TableDesc tableDesc = getTableDesc(seg.getCubeDesc().getFactTable());
- return getMRInput(tableDesc).getBatchCubingInputSide(seg);
- }
-
- public static IMRTableInputFormat getTableInputFormat(String tableName) {
- return getTableInputFormat(getTableDesc(tableName));
- }
-
- public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) {
- return getMRInput(tableDesc).getTableInputFormat(tableDesc);
- }
-
- private static IMRInput getMRInput(TableDesc tableDesc) {
- return TableSourceFactory.createEngineAdapter(tableDesc, IMRInput.class);
+ @Override
+ public Class<?> getSourceInterface() {
+ return IMRInput.class;
}
- private static TableDesc getTableDesc(String tableName) {
- return MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(tableName);
+ @Override
+ public Class<?> getStorageInterface() {
+ return IMROutput.class;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
new file mode 100644
index 0000000..099a614
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -0,0 +1,45 @@
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.TableSourceFactory;
+import org.apache.kylin.storage.StorageFactory2;
+
+public class MRUtil {
+
+ public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
+ TableDesc tableDesc = getTableDesc(seg.getCubeDesc().getFactTable());
+ return getMRInput(tableDesc).getBatchCubingInputSide(seg);
+ }
+
+ public static IMRTableInputFormat getTableInputFormat(String tableName) {
+ return getTableInputFormat(getTableDesc(tableName));
+ }
+
+ public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) {
+ return getMRInput(tableDesc).getTableInputFormat(tableDesc);
+ }
+
+ private static IMRInput getMRInput(TableDesc tableDesc) {
+ return TableSourceFactory.createEngineAdapter(tableDesc, IMRInput.class);
+ }
+
+ private static TableDesc getTableDesc(String tableName) {
+ return MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(tableName);
+ }
+
+ public static IMRBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg) {
+ return StorageFactory2.createEngineAdapter(seg.getCubeInstance(), IMROutput.class).getBatchCubingOutputSide(seg);
+ }
+
+ public static IMRBatchMergeOutputSide getBatchMergeOutputSide(CubeSegment seg) {
+ return StorageFactory2.createEngineAdapter(seg.getCubeInstance(), IMROutput.class).getBatchMergeOutputSide(seg);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java
new file mode 100644
index 0000000..e83db30
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr2;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.job.common.MapReduceExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.hadoop.cubev2.InMemCuboidJob;
+import org.apache.kylin.job.hadoop.cubev2.SaveStatisticsStep;
+
+public class BatchCubingJobBuilder2 extends JobBuilderSupport {
+
+ private final IMRBatchCubingInputSide inputSide;
+
+ public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
+ super(newSegment, submitter);
+ this.inputSide = MRUtil.getBatchCubingInputSide(seg);
+ }
+
+ public CubingJob build() {
+ final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+ final String jobId = result.getId();
+ final CubeJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
+
+ // Phase 1: Create Flat Table
+ inputSide.addStepPhase1_CreateFlatTable(result);
+
+ // Phase 2: Build Dictionary
+ result.addTask(createFactDistinctColumnsStepWithStats(flatHiveTableDesc, jobId));
+ result.addTask(createBuildDictionaryStep(jobId));
+
+ // Phase 3: Build Cube
+ result.addTask(createSaveStatisticsStep(jobId)); //<<<<<
+
+ // create htable step
+ result.addTask(createCreateHTableStepWithStats(jobId)); //<<<<<
+ result.addTask(createInMemCubingStep(flatHiveTableDesc, result.getId()));
+ // bulk load step
+ result.addTask(createBulkLoadStep(jobId)); //<<<<<
+
+ // Phase 4: Update Metadata & Cleanup
+ result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
+ inputSide.addStepPhase4_Cleanup(result);
+
+ return result;
+ }
+
+ private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
+ SaveStatisticsStep result = new SaveStatisticsStep();
+ result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
+ result.setCubeName(seg.getCubeInstance().getName());
+ result.setSegmentId(seg.getUuid());
+ result.setStatisticsPath(getStatisticsPath(jobId));
+ return result;
+ }
+
+ private MapReduceExecutable createInMemCubingStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
+ // base cuboid job
+ MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
+
+ StringBuilder cmd = new StringBuilder();
+ appendMapReduceParameters(cmd, seg);
+
+ baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
+
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "input", getFlatHiveTableLocation(flatHiveTableDesc, jobId));
+ appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
+ appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "level", "0");
+ appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+
+ baseCuboidStep.setMapReduceParams(cmd.toString());
+ baseCuboidStep.setMapReduceJobClass(InMemCuboidJob.class);
+ baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
+ return baseCuboidStep;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java
new file mode 100644
index 0000000..25ff082
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr2;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.job.common.MapReduceExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.hadoop.cubev2.MergeCuboidFromHBaseJob;
+import org.apache.kylin.job.hadoop.cubev2.MergeStatisticsStep;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class BatchMergeJobBuilder2 extends JobBuilderSupport {
+
+ public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
+ super(mergeSegment, submitter);
+ }
+
+ public CubingJob build() {
+ final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
+ final String jobId = result.getId();
+
+ final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+ Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
+ final List<String> mergingSegmentIds = Lists.newArrayList();
+ final List<String> mergingCuboidPaths = Lists.newArrayList();
+ final List<String> mergingHTables = Lists.newArrayList();
+ for (CubeSegment merging : mergingSegments) {
+ mergingSegmentIds.add(merging.getUuid());
+ mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
+ mergingHTables.add(merging.getStorageLocationIdentifier());
+ }
+
+ result.addTask(createMergeDictionaryStep(mergingSegmentIds));
+
+ String mergedStatisticsFolder = getStatisticsPath(jobId);
+ result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, mergedStatisticsFolder));
+
+ // create htable step
+ result.addTask(createCreateHTableStepWithStats(jobId)); //<<<<<
+
+ String formattedTables = StringUtil.join(mergingHTables, ",");
+ result.addTask(createMergeCuboidDataFromHBaseStep(formattedTables, jobId));
+
+ // bulk load step
+ result.addTask(createBulkLoadStep(jobId)); //<<<<<
+
+ // update cube info
+ result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
+
+ result.addTask(createGarbageCollectionStep(mergingHTables, null)); //<<<<<
+
+ return result;
+ }
+
+ private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
+ MergeStatisticsStep result = new MergeStatisticsStep();
+ result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
+ result.setCubeName(seg.getCubeInstance().getName());
+ result.setSegmentId(seg.getUuid());
+ result.setMergingSegmentIds(mergingSegmentIds);
+ result.setMergedStatisticsPath(mergedStatisticsFolder);
+ return result;
+ }
+
+ private MapReduceExecutable createMergeCuboidDataFromHBaseStep(String inputTableNames, String jobId) {
+ MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
+ mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
+ StringBuilder cmd = new StringBuilder();
+
+ appendMapReduceParameters(cmd, seg);
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "input", inputTableNames);
+ appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
+ appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+
+ mergeCuboidDataStep.setMapReduceParams(cmd.toString());
+ mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromHBaseJob.class);
+ mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+ return mergeCuboidDataStep;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java b/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java
new file mode 100644
index 0000000..aeddb9b
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java
@@ -0,0 +1,5 @@
+package org.apache.kylin.engine.mr2;
+
+public interface IMROutput2 {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java b/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java
new file mode 100644
index 0000000..8ec6f69
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr2;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.IBatchCubingEngine;
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public class MRBatchCubingEngine2 implements IBatchCubingEngine {
+
+ @Override
+ public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
+ return new BatchCubingJobBuilder2(newSegment, submitter).build();
+ }
+
+ @Override
+ public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
+ return new BatchMergeJobBuilder2(mergeSegment, submitter).build();
+ }
+
+ @Override
+ public Class<?> getSourceInterface() {
+ return IMRInput.class;
+ }
+
+ @Override
+ public Class<?> getStorageInterface() {
+ return IMROutput2.class;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java b/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
index 422c6d7..2eb9b31 100644
--- a/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
+++ b/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
@@ -35,10 +35,6 @@ public class JobEngineConfig {
public static String HADOOP_JOB_CONF_FILENAME = "kylin_job_conf";
public static String HIVE_CONF_FILENAME = "kylin_hive_conf";
- public boolean isInMemCubing() {
- return config.isCubingInMem();
- }
-
private static File getJobConfig(String fileName) {
String path = System.getProperty(KylinConfig.KYLIN_CONF);
if (StringUtils.isNotEmpty(path)) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
index 8da6cde..071554f 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
@@ -33,7 +33,7 @@ import org.apache.kylin.common.mr.KylinMapper;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.MRBatchCubingEngine;
+import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.apache.kylin.metadata.MetadataManager;
@@ -62,7 +62,7 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab
String tableName = conf.get(BatchConstants.TABLE_NAME);
tableDesc = MetadataManager.getInstance(config).getTableDesc(tableName);
- tableInputFormat = MRBatchCubingEngine.getTableInputFormat(tableDesc);
+ tableInputFormat = MRUtil.getTableInputFormat(tableDesc);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
index 58fd509..0670178 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.MRBatchCubingEngine;
+import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
@@ -77,7 +77,7 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob {
job.getConfiguration().set("dfs.block.size", "67108864");
// Mapper
- IMRTableInputFormat tableInputFormat = MRBatchCubingEngine.getTableInputFormat(table);
+ IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(table);
tableInputFormat.configureJob(job);
job.setMapperClass(ColumnCardinalityMapper.class);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
index 77c4f7e..1697339 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
@@ -34,7 +34,7 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.MRBatchCubingEngine;
+import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
@@ -100,7 +100,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
}
private void setupMapper(CubeSegment cubeSeg) throws IOException {
- IMRTableInputFormat flatTableInputFormat = MRBatchCubingEngine.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+ IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
flatTableInputFormat.configureJob(job);
job.setMapperClass(FactDistinctHiveColumnsMapper.class);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
index ee8ab3e..09dd3bf 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
@@ -18,7 +18,7 @@ import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.RowKeyDesc;
import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.MRBatchCubingEngine;
+import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
@@ -68,7 +68,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
}
}
- flatTableInputFormat = MRBatchCubingEngine.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+ flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
}
protected void handleErrorRecord(String[] record, Exception ex) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
index f89b143..c3c9539 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
@@ -36,7 +36,7 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.MRBatchCubingEngine;
+import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.apache.kylin.metadata.model.DataModelDesc;
@@ -87,7 +87,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
job.getConfiguration().set(BatchConstants.CUBE_CAPACITY, realizationCapacity.toString());
// set Mapper
- IMRTableInputFormat flatTableInputFormat = MRBatchCubingEngine.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+ IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
flatTableInputFormat.configureJob(job);
job.setMapperClass(InMemCuboidMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
index 3eb29df..b0d8b2c 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
@@ -26,7 +26,7 @@ import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.DimensionDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.MRBatchCubingEngine;
+import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.apache.kylin.job.inmemcubing.DoggedCubeBuilder;
@@ -61,7 +61,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, Immutab
cubeDesc = cube.getDescriptor();
String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
- flatTableInputFormat = MRBatchCubingEngine.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
+ flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 4b01f24..f005bc9 100644
--- a/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -125,7 +125,7 @@ public class HiveMRInput implements IMRInput {
}
@Override
- public void addStepPhase4_UpdateMetadataAndCleanup(DefaultChainedExecutable jobFlow) {
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
GarbageCollectionStep step = new GarbageCollectionStep();
step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
step.setOldHiveTable(flatHiveTableDesc.getTableName());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/storage/IStorage.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/IStorage.java b/job/src/main/java/org/apache/kylin/storage/IStorage.java
deleted file mode 100644
index 89b96e9..0000000
--- a/job/src/main/java/org/apache/kylin/storage/IStorage.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage;
-
-import org.apache.kylin.metadata.realization.IRealization;
-
-public interface IStorage {
-
- public IStorageQuery createStorageQuery(IRealization realization);
-
- public <I> I adaptToBuildEngine(Class<I> engineInterface);
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/storage/StorageFactory2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/StorageFactory2.java b/job/src/main/java/org/apache/kylin/storage/StorageFactory2.java
new file mode 100644
index 0000000..9a1dcd7
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/storage/StorageFactory2.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage;
+
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.storage.hbase.HBaseStorage;
+
+/**
+ */
+public class StorageFactory2 {
+
+ private static final IStorage dft = new HBaseStorage();
+
+ public static <T> T createEngineAdapter(IRealization realization, Class<T> engineInterface) {
+ return dft.adaptToBuildEngine(engineInterface);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput.java b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput.java
index 3de467e..5588d81 100644
--- a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput.java
+++ b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput.java
@@ -18,21 +18,43 @@
package org.apache.kylin.storage.hbase;
-import org.apache.kylin.engine.mr.IMRJobFlowParticipant;
+import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.IMROutput;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
public class HBaseMROutput implements IMROutput {
@Override
- public IMRJobFlowParticipant createBuildFlowParticipant() {
- // TODO Auto-generated method stub
- return null;
+ public IMRBatchCubingOutputSide getBatchCubingOutputSide(final CubeSegment seg) {
+ return new IMRBatchCubingOutputSide() {
+ HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+ @Override
+ public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath) {
+ steps.addSaveCuboidToHTableSteps(jobFlow, cuboidRootPath);
+ }
+
+ @Override
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+ // nothing to do
+ }
+ };
}
@Override
- public IMRJobFlowParticipant createMergeFlowParticipant() {
- // TODO Auto-generated method stub
- return null;
- }
+ public IMRBatchMergeOutputSide getBatchMergeOutputSide(final CubeSegment seg) {
+ return new IMRBatchMergeOutputSide() {
+ HBaseMRSteps steps = new HBaseMRSteps(seg);
+ @Override
+ public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath) {
+ steps.addSaveCuboidToHTableSteps(jobFlow, cuboidRootPath);
+ }
+
+ @Override
+ public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(steps.createMergeGCStep());
+ }
+ };
+ }
}
[3/5] incubator-kylin git commit: KYLIN-878 HBase storage abstraction
for cubing flow
Posted by li...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMRSteps.java b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMRSteps.java
new file mode 100644
index 0000000..d4a0e98
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMRSteps.java
@@ -0,0 +1,135 @@
+package org.apache.kylin.storage.hbase;
+
+import java.util.List;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.job.common.HadoopShellExecutable;
+import org.apache.kylin.job.common.MapReduceExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.hadoop.cube.CubeHFileJob;
+import org.apache.kylin.job.hadoop.cube.RangeKeyDistributionJob;
+import org.apache.kylin.job.hadoop.hbase.BulkLoadJob;
+import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class HBaseMRSteps extends JobBuilderSupport {
+
+ public HBaseMRSteps(CubeSegment seg) {
+ super(seg, null);
+ }
+
+ public void addSaveCuboidToHTableSteps(DefaultChainedExecutable jobFlow, String cuboidRootPath) {
+ String jobId = jobFlow.getId();
+
+ // calculate key distribution
+ jobFlow.addTask(createRangeRowkeyDistributionStep(cuboidRootPath + "*", jobId));
+ // create htable step
+ jobFlow.addTask(createCreateHTableStep(jobId));
+ // generate hfiles step
+ jobFlow.addTask(createConvertCuboidToHfileStep(cuboidRootPath + "*", jobId));
+ // bulk load step
+ jobFlow.addTask(createBulkLoadStep(jobId));
+ }
+
+ public MapReduceExecutable createRangeRowkeyDistributionStep(String inputPath, String jobId) {
+ MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable();
+ rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION);
+ StringBuilder cmd = new StringBuilder();
+
+ appendMapReduceParameters(cmd, seg);
+ appendExecCmdParameters(cmd, "input", inputPath);
+ appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath(jobId));
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Region_Splits_Calculator_" + seg.getCubeInstance().getName() + "_Step");
+
+ rowkeyDistributionStep.setMapReduceParams(cmd.toString());
+ rowkeyDistributionStep.setMapReduceJobClass(RangeKeyDistributionJob.class);
+ return rowkeyDistributionStep;
+ }
+
+ public HadoopShellExecutable createCreateHTableStep(String jobId) {
+ return createCreateHTableStep(jobId, false);
+ }
+
+ public HadoopShellExecutable createCreateHTableStepWithStats(String jobId) {
+ return createCreateHTableStep(jobId, true);
+ }
+
+ private HadoopShellExecutable createCreateHTableStep(String jobId, boolean withStats) {
+ HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
+ createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
+ StringBuilder cmd = new StringBuilder();
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
+ appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+ appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(withStats));
+
+ createHtableStep.setJobParams(cmd.toString());
+ createHtableStep.setJobClass(CreateHTableJob.class);
+
+ return createHtableStep;
+ }
+
+ public MapReduceExecutable createConvertCuboidToHfileStep(String inputPath, String jobId) {
+ MapReduceExecutable createHFilesStep = new MapReduceExecutable();
+ createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
+ StringBuilder cmd = new StringBuilder();
+
+ appendMapReduceParameters(cmd, seg);
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "input", inputPath);
+ appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
+ appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+ appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getCubeInstance().getName() + "_Step");
+
+ createHFilesStep.setMapReduceParams(cmd.toString());
+ createHFilesStep.setMapReduceJobClass(CubeHFileJob.class);
+ createHFilesStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+
+ return createHFilesStep;
+ }
+
+ public HadoopShellExecutable createBulkLoadStep(String jobId) {
+ HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
+ bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
+
+ StringBuilder cmd = new StringBuilder();
+ appendExecCmdParameters(cmd, "input", getHFilePath(jobId));
+ appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+
+ bulkLoadStep.setJobParams(cmd.toString());
+ bulkLoadStep.setJobClass(BulkLoadJob.class);
+
+ return bulkLoadStep;
+ }
+
+ public MergeGCStep createMergeGCStep() {
+ final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+ Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
+ final List<String> mergingHTables = Lists.newArrayList();
+ for (CubeSegment merging : mergingSegments) {
+ mergingHTables.add(merging.getStorageLocationIdentifier());
+ }
+
+ MergeGCStep result = new MergeGCStep();
+ result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+ result.setOldHTables(mergingHTables);
+ return result;
+ }
+
+ public String getHFilePath(String jobId) {
+ return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/";
+ }
+
+ public String getRowkeyDistributionOutputPath(String jobId) {
+ return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/job/src/main/java/org/apache/kylin/storage/hbase/MergeGCStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/hbase/MergeGCStep.java b/job/src/main/java/org/apache/kylin/storage/hbase/MergeGCStep.java
new file mode 100644
index 0000000..5fc57e3
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/storage/hbase/MergeGCStep.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Drop HBase tables that is no longer needed
+ */
+public class MergeGCStep extends AbstractExecutable {
+
+ private static final String OLD_HTABLES = "oldHTables";
+
+ private static final Logger logger = LoggerFactory.getLogger(MergeGCStep.class);
+
+ public MergeGCStep() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+
+ StringBuffer output = new StringBuffer();
+
+ List<String> oldTables = getOldHTables();
+ if (oldTables != null && oldTables.size() > 0) {
+ String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
+ Configuration conf = HBaseConfiguration.create();
+ HBaseAdmin admin = null;
+ try {
+ admin = new HBaseAdmin(conf);
+ for (String table : oldTables) {
+ if (admin.tableExists(table)) {
+ HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table));
+ String host = tableDescriptor.getValue(IRealizationConstants.HTableTag);
+ if (metadataUrlPrefix.equalsIgnoreCase(host)) {
+ if (admin.isTableEnabled(table)) {
+ admin.disableTable(table);
+ }
+ admin.deleteTable(table);
+ logger.debug("Dropped htable: " + table);
+ output.append("HBase table " + table + " is dropped. \n");
+ } else {
+ logger.debug("Skip htable: " + table);
+ output.append("Skip htable: " + table + ". \n");
+ }
+ }
+ }
+
+ } catch (IOException e) {
+ output.append("Got error when drop HBase table, exiting... \n");
+ // This should not block the merge job; Orphans should be cleaned up in StorageCleanupJob
+ return new ExecuteResult(ExecuteResult.State.ERROR, output.append(e.getLocalizedMessage()).toString());
+ } finally {
+ if (admin != null)
+ try {
+ admin.close();
+ } catch (IOException e) {
+ logger.error(e.getLocalizedMessage());
+ }
+ }
+ }
+
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+ }
+
+ public void setOldHTables(List<String> ids) {
+ setParam(OLD_HTABLES, StringUtils.join(ids, ","));
+ }
+
+ private List<String> getOldHTables() {
+ final String ids = getParam(OLD_HTABLES);
+ if (ids != null) {
+ final String[] splitted = StringUtils.split(ids, ",");
+ ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
+ for (String id : splitted) {
+ result.add(id);
+ }
+ return result;
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
index e938cbd..bea0e36 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
@@ -32,7 +32,7 @@ import org.apache.kylin.metadata.tuple.ITuple;
import org.apache.kylin.metadata.tuple.ITupleIterator;
import org.apache.kylin.query.relnode.OLAPContext;
import org.apache.kylin.storage.IStorageQuery;
-import org.apache.kylin.storage.StorageEngineFactory;
+import org.apache.kylin.storage.StorageFactory;
import org.apache.kylin.storage.hbase.coprocessor.DictCodeSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -111,7 +111,7 @@ public class OLAPEnumerator implements Enumerator<Object[]> {
olapContext.resetSQLDigest();
// query storage engine
- IStorageQuery storageEngine = StorageEngineFactory.getStorageEngine(olapContext.realization);
+ IStorageQuery storageEngine = StorageFactory.createQuery(olapContext.realization);
ITupleIterator iterator = storageEngine.search(olapContext.storageContext, olapContext.getSQLDigest(), olapContext.returnTupleInfo);
if (logger.isDebugEnabled()) {
logger.debug("return TupleIterator...");
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/ICachableStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/ICachableStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/ICachableStorageEngine.java
deleted file mode 100644
index 1fcd9b3..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/ICachableStorageEngine.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.kylin.storage;
-
-import com.google.common.collect.Range;
-
-/**
- */
-public interface ICachableStorageEngine extends IStorageQuery {
- /**
- *
- * being dynamic => getVolatilePeriod() return not null
- * being dynamic => partition column of its realization not null
- *
- * @return true for static storage like cubes
- * false for dynamic storage like II
- */
- boolean isDynamic();
-
- /**
- * volatile period is the period of time in which the returned data is not stable
- * e.g. inverted index's last several minutes' data is dynamic as time goes by.
- * data in this period cannot be cached
- *
- * This method should not be called before ITupleIterator.close() is called
- *
- * @return null if the underlying storage guarantees the data is static
- */
- Range<Long> getVolatilePeriod();
-
- /**
- * get the uuid for the realization assigned to this storage engine
- * @return
- */
- String getStorageUUID();
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/ICachableStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/ICachableStorageQuery.java b/storage/src/main/java/org/apache/kylin/storage/ICachableStorageQuery.java
new file mode 100644
index 0000000..179202e
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/ICachableStorageQuery.java
@@ -0,0 +1,33 @@
+package org.apache.kylin.storage;
+
+import com.google.common.collect.Range;
+
+/**
+ */
+public interface ICachableStorageQuery extends IStorageQuery {
+ /**
+ *
+ * being dynamic => getVolatilePeriod() return not null
+ * being dynamic => partition column of its realization not null
+ *
+ * @return true for static storage like cubes
+ * false for dynamic storage like II
+ */
+ boolean isDynamic();
+
+ /**
+ * volatile period is the period of time in which the returned data is not stable
+ * e.g. inverted index's last several minutes' data is dynamic as time goes by.
+ * data in this period cannot be cached
+ *
+ * This method should not be called before ITupleIterator.close() is called
+ *
+ * @return null if the underlying storage guarantees the data is static
+ */
+ Range<Long> getVolatilePeriod();
+
+ /**
+ * get the uuid for the realization that serves this query
+ */
+ String getStorageUUID();
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/IStorage.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/IStorage.java b/storage/src/main/java/org/apache/kylin/storage/IStorage.java
new file mode 100644
index 0000000..89b96e9
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/IStorage.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage;
+
+import org.apache.kylin.metadata.realization.IRealization;
+
+public interface IStorage {
+
+ public IStorageQuery createStorageQuery(IRealization realization);
+
+ public <I> I adaptToBuildEngine(Class<I> engineInterface);
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
deleted file mode 100644
index 3afcd32..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.storage.cache.CacheFledgedDynamicStorageEngine;
-import org.apache.kylin.storage.cache.CacheFledgedStaticStorageEngine;
-import org.apache.kylin.storage.hbase.CubeStorageEngine;
-import org.apache.kylin.storage.hbase.InvertedIndexStorageEngine;
-import org.apache.kylin.storage.hybrid.HybridInstance;
-import org.apache.kylin.storage.hybrid.HybridStorageEngine;
-
-import com.google.common.base.Preconditions;
-
-/**
- * @author xjiang
- */
-public class StorageEngineFactory {
- private static boolean allowStorageLayerCache = true;
-
- public static IStorageQuery getStorageEngine(IRealization realization) {
-
- if (realization.getType() == RealizationType.INVERTED_INDEX) {
- ICachableStorageEngine ret = new InvertedIndexStorageEngine((IIInstance) realization);
- if (allowStorageLayerCache) {
- return wrapWithCache(ret, realization);
- } else {
- return ret;
- }
- } else if (realization.getType() == RealizationType.CUBE) {
- ICachableStorageEngine ret = new CubeStorageEngine((CubeInstance) realization);
- if (allowStorageLayerCache) {
- return wrapWithCache(ret, realization);
- } else {
- return ret;
- }
- } else {
- return new HybridStorageEngine((HybridInstance) realization);
- }
- }
-
- private static IStorageQuery wrapWithCache(ICachableStorageEngine underlyingStorageEngine, IRealization realization) {
- if (underlyingStorageEngine.isDynamic()) {
- return new CacheFledgedDynamicStorageEngine(underlyingStorageEngine, getPartitionCol(realization));
- } else {
- return new CacheFledgedStaticStorageEngine(underlyingStorageEngine);
- }
- }
-
- private static TblColRef getPartitionCol(IRealization realization) {
- String modelName = realization.getModelName();
- DataModelDesc dataModelDesc = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getDataModelDesc(modelName);
- PartitionDesc partitionDesc = dataModelDesc.getPartitionDesc();
- Preconditions.checkArgument(partitionDesc != null, "PartitionDesc for " + realization + " is null!");
- TblColRef partitionColRef = partitionDesc.getPartitionDateColumnRef();
- Preconditions.checkArgument(partitionColRef != null, "getPartitionDateColumnRef for " + realization + " is null");
- return partitionColRef;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/StorageFactory.java b/storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
new file mode 100644
index 0000000..77e3010
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.storage.cache.CacheFledgedDynamicStorageEngine;
+import org.apache.kylin.storage.cache.CacheFledgedStaticStorageEngine;
+import org.apache.kylin.storage.hbase.CubeStorageQuery;
+import org.apache.kylin.storage.hbase.InvertedIndexStorageQuery;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.apache.kylin.storage.hybrid.HybridStorageEngine;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * @author xjiang
+ */
+public class StorageFactory {
+
+ private static boolean allowStorageLayerCache = true;
+
+ public static IStorageQuery createQuery(IRealization realization) {
+
+ if (realization.getType() == RealizationType.INVERTED_INDEX) {
+ ICachableStorageQuery ret = new InvertedIndexStorageQuery((IIInstance) realization);
+ if (allowStorageLayerCache) {
+ return wrapWithCache(ret, realization);
+ } else {
+ return ret;
+ }
+ } else if (realization.getType() == RealizationType.CUBE) {
+ ICachableStorageQuery ret = new CubeStorageQuery((CubeInstance) realization);
+ if (allowStorageLayerCache) {
+ return wrapWithCache(ret, realization);
+ } else {
+ return ret;
+ }
+ } else {
+ return new HybridStorageEngine((HybridInstance) realization);
+ }
+ }
+
+ private static IStorageQuery wrapWithCache(ICachableStorageQuery underlyingStorageEngine, IRealization realization) {
+ if (underlyingStorageEngine.isDynamic()) {
+ return new CacheFledgedDynamicStorageEngine(underlyingStorageEngine, getPartitionCol(realization));
+ } else {
+ return new CacheFledgedStaticStorageEngine(underlyingStorageEngine);
+ }
+ }
+
+ private static TblColRef getPartitionCol(IRealization realization) {
+ String modelName = realization.getModelName();
+ DataModelDesc dataModelDesc = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getDataModelDesc(modelName);
+ PartitionDesc partitionDesc = dataModelDesc.getPartitionDesc();
+ Preconditions.checkArgument(partitionDesc != null, "PartitionDesc for " + realization + " is null!");
+ TblColRef partitionColRef = partitionDesc.getPartitionDateColumnRef();
+ Preconditions.checkArgument(partitionColRef != null, "getPartitionDateColumnRef for " + realization + " is null");
+ return partitionColRef;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
index 34a8066..61e008f 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cache/AbstractCacheFledgedStorageEngine.java
@@ -10,7 +10,7 @@ import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
import org.apache.kylin.metadata.realization.StreamSQLDigest;
import org.apache.kylin.metadata.tuple.TeeTupleItrListener;
-import org.apache.kylin.storage.ICachableStorageEngine;
+import org.apache.kylin.storage.ICachableStorageQuery;
import org.apache.kylin.storage.IStorageQuery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -24,10 +24,10 @@ public abstract class AbstractCacheFledgedStorageEngine implements IStorageQuery
protected static CacheManager CACHE_MANAGER;
protected boolean queryCacheExists;
- protected ICachableStorageEngine underlyingStorage;
+ protected ICachableStorageQuery underlyingStorage;
protected StreamSQLDigest streamSQLDigest;
- public AbstractCacheFledgedStorageEngine(ICachableStorageEngine underlyingStorage) {
+ public AbstractCacheFledgedStorageEngine(ICachableStorageQuery underlyingStorage) {
this.underlyingStorage = underlyingStorage;
this.makeCacheIfNecessary(underlyingStorage.getStorageUUID());
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java
index 0cfa382..86e2e88 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedDynamicStorageEngine.java
@@ -12,7 +12,7 @@ import org.apache.kylin.metadata.realization.SQLDigest;
import org.apache.kylin.metadata.realization.SQLDigestUtil;
import org.apache.kylin.metadata.realization.StreamSQLDigest;
import org.apache.kylin.metadata.tuple.*;
-import org.apache.kylin.storage.ICachableStorageEngine;
+import org.apache.kylin.storage.ICachableStorageQuery;
import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.hbase.coprocessor.endpoint.TsConditionExtractor;
import org.apache.kylin.storage.tuple.TupleInfo;
@@ -30,7 +30,7 @@ public class CacheFledgedDynamicStorageEngine extends AbstractCacheFledgedStorag
private Range<Long> ts;
- public CacheFledgedDynamicStorageEngine(ICachableStorageEngine underlyingStorage, TblColRef partitionColRef) {
+ public CacheFledgedDynamicStorageEngine(ICachableStorageQuery underlyingStorage, TblColRef partitionColRef) {
super(underlyingStorage);
this.partitionColRef = partitionColRef;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java
index 60b8a1b..1e95722 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cache/CacheFledgedStaticStorageEngine.java
@@ -9,7 +9,7 @@ import org.apache.kylin.metadata.tuple.ITuple;
import org.apache.kylin.metadata.tuple.ITupleIterator;
import org.apache.kylin.metadata.tuple.SimpleTupleIterator;
import org.apache.kylin.metadata.tuple.TeeTupleIterator;
-import org.apache.kylin.storage.ICachableStorageEngine;
+import org.apache.kylin.storage.ICachableStorageQuery;
import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.tuple.TupleInfo;
import org.slf4j.Logger;
@@ -20,7 +20,7 @@ import java.util.List;
public class CacheFledgedStaticStorageEngine extends AbstractCacheFledgedStorageEngine {
private static final Logger logger = LoggerFactory.getLogger(CacheFledgedStaticStorageEngine.class);
- public CacheFledgedStaticStorageEngine(ICachableStorageEngine underlyingStorage) {
+ public CacheFledgedStaticStorageEngine(ICachableStorageQuery underlyingStorage) {
super(underlyingStorage);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageEngine.java
deleted file mode 100644
index 549961f..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageEngine.java
+++ /dev/null
@@ -1,371 +0,0 @@
-package org.apache.kylin.storage.cube;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
-import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.ICachableStorageEngine;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.hbase.DerivedFilterTranslator;
-import org.apache.kylin.storage.tuple.TupleInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
-
-public class CubeStorageEngine implements ICachableStorageEngine {
-
- private static final Logger logger = LoggerFactory.getLogger(CubeStorageEngine.class);
-
- private static final long MEM_BUDGET_PER_QUERY = 3L * 1024 * 1024 * 1024; // 3G
-
- private final CubeInstance cubeInstance;
- private final CubeDesc cubeDesc;
-
- public CubeStorageEngine(CubeInstance cube) {
- this.cubeInstance = cube;
- this.cubeDesc = cube.getDescriptor();
- }
-
- @Override
- public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
- Collection<TblColRef> groups = sqlDigest.groupbyColumns;
- TupleFilter filter = sqlDigest.filter;
-
- // build dimension & metrics
- Set<TblColRef> dimensions = new LinkedHashSet<TblColRef>();
- Set<FunctionDesc> metrics = new LinkedHashSet<FunctionDesc>();
- buildDimensionsAndMetrics(sqlDigest, dimensions, metrics);
-
- // all dimensions = groups + filter dimensions
- Set<TblColRef> filterDims = Sets.newHashSet(dimensions);
- filterDims.removeAll(groups);
-
- // expand derived (xxxD means contains host columns only, derived columns were translated)
- Set<TblColRef> derivedPostAggregation = Sets.newHashSet();
- Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation);
- Set<TblColRef> filterDimsD = expandDerived(filterDims, derivedPostAggregation);
- filterDimsD.removeAll(groupsD);
- derivedPostAggregation.removeAll(groups);
-
- // identify cuboid
- Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>();
- dimensionsD.addAll(groupsD);
- dimensionsD.addAll(filterDimsD);
- Cuboid cuboid = identifyCuboid(dimensionsD);
- context.setCuboid(cuboid);
-
- // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
- Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
- boolean isExactAggregation = isExactAggregation(cuboid, groups, filterDimsD, singleValuesD, derivedPostAggregation);
- context.setExactAggregation(isExactAggregation);
-
- if (isExactAggregation) {
- metrics = replaceHolisticCountDistinct(metrics);
- }
-
- // replace derived columns in filter with host columns; columns on loosened condition must be added to group by
- TupleFilter filterD = translateDerived(filter, groupsD);
-
- setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory
- // TODO enable coprocessor
-// setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
- setLimit(filter, context);
-
- List<CubeScanner> scanners = Lists.newArrayList();
- for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
- scanners.add(new CubeScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD));
- }
-
- if (scanners.isEmpty())
- return ITupleIterator.EMPTY_TUPLE_ITERATOR;
-
- return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo);
- }
-
- private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
- for (FunctionDesc func : sqlDigest.aggregations) {
- if (!func.isDimensionAsMetric()) {
- // use the FunctionDesc from cube desc as much as possible, that has more info such as HLLC precision
- metrics.add(findAggrFuncFromCubeDesc(func));
- }
- }
-
- for (TblColRef column : sqlDigest.allColumns) {
- // skip measure columns
- if (sqlDigest.metricColumns.contains(column)) {
- continue;
- }
- dimensions.add(column);
- }
- }
-
- private FunctionDesc findAggrFuncFromCubeDesc(FunctionDesc aggrFunc) {
- for (MeasureDesc measure : cubeDesc.getMeasures()) {
- if (measure.getFunction().equals(aggrFunc))
- return measure.getFunction();
- }
- return aggrFunc;
- }
-
- private Set<TblColRef> expandDerived(Collection<TblColRef> cols, Set<TblColRef> derivedPostAggregation) {
- Set<TblColRef> expanded = Sets.newHashSet();
- for (TblColRef col : cols) {
- if (cubeDesc.isDerived(col)) {
- DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
- for (TblColRef hostCol : hostInfo.columns) {
- expanded.add(hostCol);
- if (hostInfo.isOneToOne == false)
- derivedPostAggregation.add(hostCol);
- }
- } else {
- expanded.add(col);
- }
- }
- return expanded;
- }
-
- private Cuboid identifyCuboid(Set<TblColRef> dimensions) {
- long cuboidID = 0;
- for (TblColRef column : dimensions) {
- int index = cubeDesc.getRowkey().getColumnBitIndex(column);
- cuboidID |= 1L << index;
- }
- return Cuboid.findById(cubeDesc, cuboidID);
- }
-
- @SuppressWarnings("unchecked")
- private Set<TblColRef> findSingleValueColumns(TupleFilter filter) {
- Collection<? extends TupleFilter> toCheck;
- if (filter instanceof CompareTupleFilter) {
- toCheck = Collections.singleton(filter);
- } else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) {
- toCheck = filter.getChildren();
- } else {
- return (Set<TblColRef>) Collections.EMPTY_SET;
- }
-
- Set<TblColRef> result = Sets.newHashSet();
- for (TupleFilter f : toCheck) {
- if (f instanceof CompareTupleFilter) {
- CompareTupleFilter compFilter = (CompareTupleFilter) f;
- // is COL=const ?
- if (compFilter.getOperator() == FilterOperatorEnum.EQ && compFilter.getValues().size() == 1 && compFilter.getColumn() != null) {
- result.add(compFilter.getColumn());
- }
- }
- }
-
- // expand derived
- Set<TblColRef> resultD = Sets.newHashSet();
- for (TblColRef col : result) {
- if (cubeDesc.isDerived(col)) {
- DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
- if (hostInfo.isOneToOne) {
- for (TblColRef hostCol : hostInfo.columns) {
- resultD.add(hostCol);
- }
- }
- //if not one2one, it will be pruned
- } else {
- resultD.add(col);
- }
- }
- return resultD;
- }
-
- private boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
- boolean exact = true;
-
- if (cuboid.requirePostAggregation()) {
- exact = false;
- logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
- }
-
- // derived aggregation is bad, unless expanded columns are already in group by
- if (groups.containsAll(derivedPostAggregation) == false) {
- exact = false;
- logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation);
- }
-
- // other columns (from filter) is bad, unless they are ensured to have single value
- if (singleValuesD.containsAll(othersD) == false) {
- exact = false;
- logger.info("exactAggregation is false because some column not on group by: " + othersD //
- + " (single value column: " + singleValuesD + ")");
- }
-
- if (exact) {
- logger.info("exactAggregation is true");
- }
- return exact;
- }
-
- private Set<FunctionDesc> replaceHolisticCountDistinct(Set<FunctionDesc> metrics) {
- // for count distinct, try use its holistic version if possible
- Set<FunctionDesc> result = new LinkedHashSet<FunctionDesc>();
- for (FunctionDesc metric : metrics) {
- if (metric.isCountDistinct() == false) {
- result.add(metric);
- continue;
- }
-
- FunctionDesc holisticVersion = null;
- for (MeasureDesc measure : cubeDesc.getMeasures()) {
- FunctionDesc measureFunc = measure.getFunction();
- if (measureFunc.equals(metric) && measureFunc.isHolisticCountDistinct()) {
- holisticVersion = measureFunc;
- }
- }
- result.add(holisticVersion == null ? metric : holisticVersion);
- }
- return result;
- }
-
- @SuppressWarnings("unchecked")
- private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) {
- if (filter == null)
- return filter;
-
- if (filter instanceof CompareTupleFilter) {
- return translateDerivedInCompare((CompareTupleFilter) filter, collector);
- }
-
- List<TupleFilter> children = (List<TupleFilter>) filter.getChildren();
- List<TupleFilter> newChildren = Lists.newArrayListWithCapacity(children.size());
- boolean modified = false;
- for (TupleFilter child : children) {
- TupleFilter translated = translateDerived(child, collector);
- newChildren.add(translated);
- if (child != translated)
- modified = true;
- }
- if (modified) {
- filter = replaceChildren(filter, newChildren);
- }
- return filter;
- }
-
- private TupleFilter replaceChildren(TupleFilter filter, List<TupleFilter> newChildren) {
- if (filter instanceof LogicalTupleFilter) {
- LogicalTupleFilter r = new LogicalTupleFilter(filter.getOperator());
- r.addChildren(newChildren);
- return r;
- } else
- throw new IllegalStateException("Cannot replaceChildren on " + filter);
- }
-
- private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set<TblColRef> collector) {
- if (compf.getColumn() == null || compf.getValues().isEmpty())
- return compf;
-
- TblColRef derived = compf.getColumn();
- if (cubeDesc.isDerived(derived) == false)
- return compf;
-
- DeriveInfo hostInfo = cubeDesc.getHostInfo(derived);
- CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig());
- CubeSegment seg = cubeInstance.getLatestReadySegment();
- LookupStringTable lookup = cubeMgr.getLookupTable(seg, hostInfo.dimension);
- Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf);
- TupleFilter translatedFilter = translated.getFirst();
- boolean loosened = translated.getSecond();
- if (loosened) {
- collectColumnsRecursively(translatedFilter, collector);
- }
- return translatedFilter;
- }
-
- private void collectColumnsRecursively(TupleFilter filter, Set<TblColRef> collector) {
- if (filter instanceof ColumnTupleFilter) {
- collectColumns(((ColumnTupleFilter) filter).getColumn(), collector);
- }
- for (TupleFilter child : filter.getChildren()) {
- collectColumnsRecursively(child, collector);
- }
- }
-
- private void collectColumns(TblColRef col, Set<TblColRef> collector) {
- if (cubeDesc.isDerived(col)) {
- DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
- for (TblColRef h : hostInfo.columns)
- collector.add(h);
- } else {
- collector.add(col);
- }
- }
-
- private void setThreshold(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) {
- boolean hasMemHungryCountDistinct = false;
- for (FunctionDesc func : metrics) {
- if (func.isCountDistinct() && !func.isHolisticCountDistinct()) {
- hasMemHungryCountDistinct = true;
- }
- }
-
- // need to limit the memory usage for memory hungry count distinct
- if (hasMemHungryCountDistinct == false) {
- return;
- }
-
- int rowSizeEst = dimensions.size() * 3;
- for (FunctionDesc func : metrics) {
- rowSizeEst += func.getReturnDataType().getSpaceEstimate();
- }
-
- long rowEst = MEM_BUDGET_PER_QUERY / rowSizeEst;
- context.setThreshold((int) rowEst);
- }
-
- private void setLimit(TupleFilter filter, StorageContext context) {
- boolean goodAggr = context.isExactAggregation();
- boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
- boolean goodSort = context.hasSort() == false;
- if (goodAggr && goodFilter && goodSort) {
- logger.info("Enable limit " + context.getLimit());
- context.enableLimit();
- }
- }
-
- // ============================================================================
-
- @Override
- public Range<Long> getVolatilePeriod() {
- return null;
- }
-
- @Override
- public String getStorageUUID() {
- return cubeInstance.getUuid();
- }
-
-
- @Override
- public boolean isDynamic() {
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4d3af3a9/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageQuery.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageQuery.java
new file mode 100644
index 0000000..8164310
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageQuery.java
@@ -0,0 +1,371 @@
+package org.apache.kylin.storage.cube;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
+import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.storage.ICachableStorageQuery;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.hbase.DerivedFilterTranslator;
+import org.apache.kylin.storage.tuple.TupleInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+
+public class CubeStorageQuery implements ICachableStorageQuery {
+
+ private static final Logger logger = LoggerFactory.getLogger(CubeStorageQuery.class);
+
+ private static final long MEM_BUDGET_PER_QUERY = 3L * 1024 * 1024 * 1024; // 3G
+
+ private final CubeInstance cubeInstance;
+ private final CubeDesc cubeDesc;
+
+ public CubeStorageQuery(CubeInstance cube) {
+ this.cubeInstance = cube;
+ this.cubeDesc = cube.getDescriptor();
+ }
+
+ @Override
+ public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+ Collection<TblColRef> groups = sqlDigest.groupbyColumns;
+ TupleFilter filter = sqlDigest.filter;
+
+ // build dimension & metrics
+ Set<TblColRef> dimensions = new LinkedHashSet<TblColRef>();
+ Set<FunctionDesc> metrics = new LinkedHashSet<FunctionDesc>();
+ buildDimensionsAndMetrics(sqlDigest, dimensions, metrics);
+
+ // all dimensions = groups + filter dimensions
+ Set<TblColRef> filterDims = Sets.newHashSet(dimensions);
+ filterDims.removeAll(groups);
+
+ // expand derived (xxxD means contains host columns only, derived columns were translated)
+ Set<TblColRef> derivedPostAggregation = Sets.newHashSet();
+ Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation);
+ Set<TblColRef> filterDimsD = expandDerived(filterDims, derivedPostAggregation);
+ filterDimsD.removeAll(groupsD);
+ derivedPostAggregation.removeAll(groups);
+
+ // identify cuboid
+ Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>();
+ dimensionsD.addAll(groupsD);
+ dimensionsD.addAll(filterDimsD);
+ Cuboid cuboid = identifyCuboid(dimensionsD);
+ context.setCuboid(cuboid);
+
+ // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
+ Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
+ boolean isExactAggregation = isExactAggregation(cuboid, groups, filterDimsD, singleValuesD, derivedPostAggregation);
+ context.setExactAggregation(isExactAggregation);
+
+ if (isExactAggregation) {
+ metrics = replaceHolisticCountDistinct(metrics);
+ }
+
+ // replace derived columns in filter with host columns; columns on loosened condition must be added to group by
+ TupleFilter filterD = translateDerived(filter, groupsD);
+
+ setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory
+ // TODO enable coprocessor
+// setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
+ setLimit(filter, context);
+
+ List<CubeScanner> scanners = Lists.newArrayList();
+ for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
+ scanners.add(new CubeScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD));
+ }
+
+ if (scanners.isEmpty())
+ return ITupleIterator.EMPTY_TUPLE_ITERATOR;
+
+ return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo);
+ }
+
+ private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
+ for (FunctionDesc func : sqlDigest.aggregations) {
+ if (!func.isDimensionAsMetric()) {
+ // use the FunctionDesc from cube desc as much as possible, that has more info such as HLLC precision
+ metrics.add(findAggrFuncFromCubeDesc(func));
+ }
+ }
+
+ for (TblColRef column : sqlDigest.allColumns) {
+ // skip measure columns
+ if (sqlDigest.metricColumns.contains(column)) {
+ continue;
+ }
+ dimensions.add(column);
+ }
+ }
+
+ private FunctionDesc findAggrFuncFromCubeDesc(FunctionDesc aggrFunc) {
+ for (MeasureDesc measure : cubeDesc.getMeasures()) {
+ if (measure.getFunction().equals(aggrFunc))
+ return measure.getFunction();
+ }
+ return aggrFunc;
+ }
+
+ private Set<TblColRef> expandDerived(Collection<TblColRef> cols, Set<TblColRef> derivedPostAggregation) {
+ Set<TblColRef> expanded = Sets.newHashSet();
+ for (TblColRef col : cols) {
+ if (cubeDesc.isDerived(col)) {
+ DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+ for (TblColRef hostCol : hostInfo.columns) {
+ expanded.add(hostCol);
+ if (hostInfo.isOneToOne == false)
+ derivedPostAggregation.add(hostCol);
+ }
+ } else {
+ expanded.add(col);
+ }
+ }
+ return expanded;
+ }
+
+ private Cuboid identifyCuboid(Set<TblColRef> dimensions) {
+ long cuboidID = 0;
+ for (TblColRef column : dimensions) {
+ int index = cubeDesc.getRowkey().getColumnBitIndex(column);
+ cuboidID |= 1L << index;
+ }
+ return Cuboid.findById(cubeDesc, cuboidID);
+ }
+
+ @SuppressWarnings("unchecked")
+ private Set<TblColRef> findSingleValueColumns(TupleFilter filter) {
+ Collection<? extends TupleFilter> toCheck;
+ if (filter instanceof CompareTupleFilter) {
+ toCheck = Collections.singleton(filter);
+ } else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) {
+ toCheck = filter.getChildren();
+ } else {
+ return (Set<TblColRef>) Collections.EMPTY_SET;
+ }
+
+ Set<TblColRef> result = Sets.newHashSet();
+ for (TupleFilter f : toCheck) {
+ if (f instanceof CompareTupleFilter) {
+ CompareTupleFilter compFilter = (CompareTupleFilter) f;
+ // is COL=const ?
+ if (compFilter.getOperator() == FilterOperatorEnum.EQ && compFilter.getValues().size() == 1 && compFilter.getColumn() != null) {
+ result.add(compFilter.getColumn());
+ }
+ }
+ }
+
+ // expand derived
+ Set<TblColRef> resultD = Sets.newHashSet();
+ for (TblColRef col : result) {
+ if (cubeDesc.isDerived(col)) {
+ DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+ if (hostInfo.isOneToOne) {
+ for (TblColRef hostCol : hostInfo.columns) {
+ resultD.add(hostCol);
+ }
+ }
+ //if not one2one, it will be pruned
+ } else {
+ resultD.add(col);
+ }
+ }
+ return resultD;
+ }
+
+ private boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
+ boolean exact = true;
+
+ if (cuboid.requirePostAggregation()) {
+ exact = false;
+ logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
+ }
+
+ // derived aggregation is bad, unless expanded columns are already in group by
+ if (groups.containsAll(derivedPostAggregation) == false) {
+ exact = false;
+ logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation);
+ }
+
+ // other columns (from filter) is bad, unless they are ensured to have single value
+ if (singleValuesD.containsAll(othersD) == false) {
+ exact = false;
+ logger.info("exactAggregation is false because some column not on group by: " + othersD //
+ + " (single value column: " + singleValuesD + ")");
+ }
+
+ if (exact) {
+ logger.info("exactAggregation is true");
+ }
+ return exact;
+ }
+
+ private Set<FunctionDesc> replaceHolisticCountDistinct(Set<FunctionDesc> metrics) {
+ // for count distinct, try use its holistic version if possible
+ Set<FunctionDesc> result = new LinkedHashSet<FunctionDesc>();
+ for (FunctionDesc metric : metrics) {
+ if (metric.isCountDistinct() == false) {
+ result.add(metric);
+ continue;
+ }
+
+ FunctionDesc holisticVersion = null;
+ for (MeasureDesc measure : cubeDesc.getMeasures()) {
+ FunctionDesc measureFunc = measure.getFunction();
+ if (measureFunc.equals(metric) && measureFunc.isHolisticCountDistinct()) {
+ holisticVersion = measureFunc;
+ }
+ }
+ result.add(holisticVersion == null ? metric : holisticVersion);
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) {
+ if (filter == null)
+ return filter;
+
+ if (filter instanceof CompareTupleFilter) {
+ return translateDerivedInCompare((CompareTupleFilter) filter, collector);
+ }
+
+ List<TupleFilter> children = (List<TupleFilter>) filter.getChildren();
+ List<TupleFilter> newChildren = Lists.newArrayListWithCapacity(children.size());
+ boolean modified = false;
+ for (TupleFilter child : children) {
+ TupleFilter translated = translateDerived(child, collector);
+ newChildren.add(translated);
+ if (child != translated)
+ modified = true;
+ }
+ if (modified) {
+ filter = replaceChildren(filter, newChildren);
+ }
+ return filter;
+ }
+
+ private TupleFilter replaceChildren(TupleFilter filter, List<TupleFilter> newChildren) {
+ if (filter instanceof LogicalTupleFilter) {
+ LogicalTupleFilter r = new LogicalTupleFilter(filter.getOperator());
+ r.addChildren(newChildren);
+ return r;
+ } else
+ throw new IllegalStateException("Cannot replaceChildren on " + filter);
+ }
+
+ private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set<TblColRef> collector) {
+ if (compf.getColumn() == null || compf.getValues().isEmpty())
+ return compf;
+
+ TblColRef derived = compf.getColumn();
+ if (cubeDesc.isDerived(derived) == false)
+ return compf;
+
+ DeriveInfo hostInfo = cubeDesc.getHostInfo(derived);
+ CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig());
+ CubeSegment seg = cubeInstance.getLatestReadySegment();
+ LookupStringTable lookup = cubeMgr.getLookupTable(seg, hostInfo.dimension);
+ Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf);
+ TupleFilter translatedFilter = translated.getFirst();
+ boolean loosened = translated.getSecond();
+ if (loosened) {
+ collectColumnsRecursively(translatedFilter, collector);
+ }
+ return translatedFilter;
+ }
+
+ private void collectColumnsRecursively(TupleFilter filter, Set<TblColRef> collector) {
+ if (filter instanceof ColumnTupleFilter) {
+ collectColumns(((ColumnTupleFilter) filter).getColumn(), collector);
+ }
+ for (TupleFilter child : filter.getChildren()) {
+ collectColumnsRecursively(child, collector);
+ }
+ }
+
+ private void collectColumns(TblColRef col, Set<TblColRef> collector) {
+ if (cubeDesc.isDerived(col)) {
+ DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+ for (TblColRef h : hostInfo.columns)
+ collector.add(h);
+ } else {
+ collector.add(col);
+ }
+ }
+
+ private void setThreshold(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) {
+ boolean hasMemHungryCountDistinct = false;
+ for (FunctionDesc func : metrics) {
+ if (func.isCountDistinct() && !func.isHolisticCountDistinct()) {
+ hasMemHungryCountDistinct = true;
+ }
+ }
+
+ // need to limit the memory usage for memory hungry count distinct
+ if (hasMemHungryCountDistinct == false) {
+ return;
+ }
+
+ int rowSizeEst = dimensions.size() * 3;
+ for (FunctionDesc func : metrics) {
+ rowSizeEst += func.getReturnDataType().getSpaceEstimate();
+ }
+
+ long rowEst = MEM_BUDGET_PER_QUERY / rowSizeEst;
+ context.setThreshold((int) rowEst);
+ }
+
+ private void setLimit(TupleFilter filter, StorageContext context) {
+ boolean goodAggr = context.isExactAggregation();
+ boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
+ boolean goodSort = context.hasSort() == false;
+ if (goodAggr && goodFilter && goodSort) {
+ logger.info("Enable limit " + context.getLimit());
+ context.enableLimit();
+ }
+ }
+
+ // ============================================================================
+
+ @Override
+ public Range<Long> getVolatilePeriod() {
+ return null;
+ }
+
+ @Override
+ public String getStorageUUID() {
+ return cubeInstance.getUuid();
+ }
+
+
+ @Override
+ public boolean isDynamic() {
+ return false;
+ }
+
+}
[5/5] incubator-kylin git commit: KYLIN-878 half way
Posted by li...@apache.org.
KYLIN-878 half way
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/9a82f39b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/9a82f39b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/9a82f39b
Branch: refs/heads/KYLIN-878
Commit: 9a82f39bdfc60347e259547be6460f958986a0e8
Parents: 4d3af3a
Author: Li, Yang <ya...@ebay.com>
Authored: Mon Jul 20 08:59:10 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Mon Jul 20 08:59:57 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/common/util/BytesSplitter.java | 9 +-
.../apache/kylin/engine/BuildEngineFactory.java | 2 +-
.../kylin/engine/mr/BatchCubingJobBuilder.java | 10 +-
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 100 +++++++++++++++++
.../kylin/engine/mr/BatchMergeJobBuilder2.java | 102 ++++++++++++++++++
.../org/apache/kylin/engine/mr/IMROutput.java | 16 ++-
.../org/apache/kylin/engine/mr/IMROutput2.java | 53 +++++++++
.../kylin/engine/mr/JobBuilderSupport.java | 15 +--
.../kylin/engine/mr/MRBatchCubingEngine2.java | 47 ++++++++
.../java/org/apache/kylin/engine/mr/MRUtil.java | 10 ++
.../engine/mr2/BatchCubingJobBuilder2.java | 102 ------------------
.../kylin/engine/mr2/BatchMergeJobBuilder2.java | 107 -------------------
.../org/apache/kylin/engine/mr2/IMROutput2.java | 5 -
.../kylin/engine/mr2/MRBatchCubingEngine2.java | 48 ---------
.../apache/kylin/job/hadoop/cube/CuboidJob.java | 45 +++++---
.../job/hadoop/cube/HiveToBaseCuboidMapper.java | 28 ++++-
.../kylin/storage/hbase/HBaseMROutput2.java | 71 ++++++++++++
17 files changed, 467 insertions(+), 303 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java b/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
index bd16246..7249dcf 100644
--- a/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
+++ b/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * @author xjiang
*/
public class BytesSplitter {
private static final Logger logger = LoggerFactory.getLogger(BytesSplitter.class);
@@ -79,6 +78,14 @@ public class BytesSplitter {
return bufferSize;
}
+
+ public void setBuffers(byte[][] buffers) {
+ for (int i = 0; i < buffers.length; i++) {
+ splitBuffers[i].value = buffers[i];
+ splitBuffers[i].length = buffers[i].length;
+ }
+ this.bufferSize = buffers.length;
+ }
public byte inferByteRowDelimiter(byte[] bytes, int byteLen, int expectedSplits) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
index 721b85a..d24c99c 100644
--- a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
+++ b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
@@ -21,7 +21,7 @@ package org.apache.kylin.engine;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.MRBatchCubingEngine;
-import org.apache.kylin.engine.mr2.MRBatchCubingEngine2;
+import org.apache.kylin.engine.mr.MRBatchCubingEngine2;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
public class BuildEngineFactory {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index 0ff3bef..a39ac74 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -19,7 +19,6 @@
package org.apache.kylin.engine.mr;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
import org.apache.kylin.job.common.MapReduceExecutable;
@@ -42,13 +41,12 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
final String jobId = result.getId();
final String cuboidRootPath = getCuboidRootPath(jobId);
- final CubeJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
// Phase 1: Create Flat Table
inputSide.addStepPhase1_CreateFlatTable(result);
// Phase 2: Build Dictionary
- result.addTask(createFactDistinctColumnsStep(flatHiveTableDesc, jobId));
+ result.addTask(createFactDistinctColumnsStep(jobId));
result.addTask(createBuildDictionaryStep(jobId));
// Phase 3: Build Cube
@@ -56,7 +54,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
// base cuboid step
- result.addTask(createBaseCuboidStep(flatHiveTableDesc, cuboidOutputTempPath, jobId));
+ result.addTask(createBaseCuboidStep(cuboidOutputTempPath, jobId));
// n dim cuboid steps
for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
int dimNum = totalRowkeyColumnsCount - i;
@@ -72,7 +70,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
return result;
}
- private MapReduceExecutable createBaseCuboidStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String[] cuboidOutputTempPath, String jobId) {
+ private MapReduceExecutable createBaseCuboidStep(String[] cuboidOutputTempPath, String jobId) {
// base cuboid job
MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
@@ -83,7 +81,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", getFlatHiveTableLocation(flatHiveTableDesc, jobId));
+ appendExecCmdParameters(cmd, "input", ""); // marks flat table input
appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getCubeInstance().getName());
appendExecCmdParameters(cmd, "level", "0");
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
new file mode 100644
index 0000000..a83c596
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.job.common.MapReduceExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.hadoop.cubev2.InMemCuboidJob;
+import org.apache.kylin.job.hadoop.cubev2.SaveStatisticsStep;
+
+public class BatchCubingJobBuilder2 extends JobBuilderSupport {
+
+ private final IMRBatchCubingInputSide inputSide;
+ private final IMRBatchCubingOutputSide2 outputSide;
+
+ public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
+ super(newSegment, submitter);
+ this.inputSide = MRUtil.getBatchCubingInputSide(seg);
+ this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
+ }
+
+ public CubingJob build() {
+ final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+ final String jobId = result.getId();
+ final CubeJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
+
+ // Phase 1: Create Flat Table
+ inputSide.addStepPhase1_CreateFlatTable(result);
+
+ // Phase 2: Build Dictionary
+ result.addTask(createFactDistinctColumnsStepWithStats(jobId));
+ result.addTask(createBuildDictionaryStep(jobId));
+ result.addTask(createSaveStatisticsStep(jobId));
+ outputSide.addStepPhase2_BuildDictionary(result);
+
+ // Phase 3: Build Cube
+ result.addTask(createInMemCubingStep(flatHiveTableDesc, result.getId()));
+ outputSide.addStepPhase3_BuildCube(result);
+
+ // Phase 4: Update Metadata & Cleanup
+ result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
+ inputSide.addStepPhase4_Cleanup(result);
+ outputSide.addStepPhase4_Cleanup(result);
+
+ return result;
+ }
+
+ private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
+ SaveStatisticsStep result = new SaveStatisticsStep();
+ result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
+ result.setCubeName(seg.getCubeInstance().getName());
+ result.setSegmentId(seg.getUuid());
+ result.setStatisticsPath(getStatisticsPath(jobId));
+ return result;
+ }
+
+ private MapReduceExecutable createInMemCubingStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
+ // base cuboid job
+ MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
+
+ StringBuilder cmd = new StringBuilder();
+ appendMapReduceParameters(cmd, seg);
+
+ baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
+
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "input", getFlatHiveTableLocation(flatHiveTableDesc, jobId));
+ appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
+ appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "level", "0");
+ appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+
+ baseCuboidStep.setMapReduceParams(cmd.toString());
+ baseCuboidStep.setMapReduceJobClass(InMemCuboidJob.class);
+ baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
+ return baseCuboidStep;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
new file mode 100644
index 0000000..f97de13
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.job.common.MapReduceExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.hadoop.cubev2.MergeCuboidFromHBaseJob;
+import org.apache.kylin.job.hadoop.cubev2.MergeStatisticsStep;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class BatchMergeJobBuilder2 extends JobBuilderSupport {
+
+ private final IMRBatchCubingOutputSide2 outputSide;
+
+ public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
+ super(mergeSegment, submitter);
+ this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
+ }
+
+ public CubingJob build() {
+ final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
+ final String jobId = result.getId();
+
+ final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+ Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
+ final List<String> mergingSegmentIds = Lists.newArrayList();
+ final List<String> mergingHTables = Lists.newArrayList();
+ for (CubeSegment merging : mergingSegments) {
+ mergingSegmentIds.add(merging.getUuid());
+ mergingHTables.add(merging.getStorageLocationIdentifier());
+ }
+
+ // Phase 1: Merge Dictionary
+ result.addTask(createMergeDictionaryStep(mergingSegmentIds));
+ result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, getStatisticsPath(jobId)));
+ outputSide.addStepPhase2_BuildDictionary(result);
+
+ // Phase 2: Merge Cube
+ String formattedTables = StringUtil.join(mergingHTables, ",");
+ result.addTask(createMergeCuboidDataFromStorageStep(formattedTables, jobId));
+ outputSide.addStepPhase3_BuildCube(result);
+
+ // Phase 3: Update Metadata & Cleanup
+ result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
+ outputSide.addStepPhase4_Cleanup(result);
+
+ return result;
+ }
+
+ private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
+ MergeStatisticsStep result = new MergeStatisticsStep();
+ result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
+ result.setCubeName(seg.getCubeInstance().getName());
+ result.setSegmentId(seg.getUuid());
+ result.setMergingSegmentIds(mergingSegmentIds);
+ result.setMergedStatisticsPath(mergedStatisticsFolder);
+ return result;
+ }
+
+ private MapReduceExecutable createMergeCuboidDataFromStorageStep(String inputTableNames, String jobId) {
+ MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
+ mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
+ StringBuilder cmd = new StringBuilder();
+
+ appendMapReduceParameters(cmd, seg);
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "input", inputTableNames);
+ appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
+ appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+
+ mergeCuboidDataStep.setMapReduceParams(cmd.toString());
+ mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromHBaseJob.class);
+ mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+ return mergeCuboidDataStep;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
index 8896a2e..bc6ee1f 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
@@ -37,7 +37,13 @@ public interface IMROutput {
*/
public interface IMRBatchCubingOutputSide {
- /** Add step that saves cuboid output from HDFS to storage. */
+ /**
+ * Add step that saves cuboid output from HDFS to storage.
+ *
+ * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn",
+ * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+ * dictionary encoding; Mx is measure value serialization form.
+ */
public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
/** Add step that does any necessary clean up. */
@@ -57,7 +63,13 @@ public interface IMROutput {
*/
public interface IMRBatchMergeOutputSide {
- /** Add step that saves cuboid output from HDFS to storage. */
+ /**
+ * Add step that saves cuboid output from HDFS to storage.
+ *
+ * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn",
+ * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+ * dictionary encoding; Mx is measure value serialization form.
+ */
public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
/** Add step that does any necessary clean up. */
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
new file mode 100644
index 0000000..9aecba9
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
@@ -0,0 +1,53 @@
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public interface IMROutput2 {
+
+ /** Return a helper to participate in batch cubing job flow. */
+ public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(CubeSegment seg);
+
+ /**
+ * Participate the batch cubing flow as the output side.
+ *
+ * - Phase 1: Create Flat Table
+ * - Phase 2: Build Dictionary
+ * - Phase 3: Build Cube
+ * - Phase 4: Update Metadata & Cleanup
+ */
+ public interface IMRBatchCubingOutputSide2 {
+
+ /** Add step that executes after build dictionary and before build cube. */
+ public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow);
+
+ /** Add step that executes after build cube. */
+ public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
+
+ /** Add step that does any necessary clean up. */
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+ }
+
+ /** Return a helper to participate in batch merge job flow. */
+ public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(CubeSegment seg);
+
+ /**
+ * Participate the batch merge flow as the output side.
+ *
+ * - Phase 1: Merge Dictionary
+ * - Phase 2: Merge Cube
+ * - Phase 3: Update Metadata & Cleanup
+ */
+ public interface IMRBatchMergeOutputSide2 {
+
+ /** Add step that executes after merge dictionary and before merge cube. */
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+
+ /** Add step that executes after merge cube. */
+ public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow);
+
+ /** Add step that does any necessary clean up. */
+ public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 4652269..42d30c8 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -23,7 +23,6 @@ import java.util.List;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.job.common.HadoopShellExecutable;
import org.apache.kylin.job.common.MapReduceExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
@@ -49,15 +48,15 @@ public class JobBuilderSupport {
this.submitter = submitter;
}
- public MapReduceExecutable createFactDistinctColumnsStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
- return createFactDistinctColumnsStep(flatHiveTableDesc, jobId, false);
+ public MapReduceExecutable createFactDistinctColumnsStep(String jobId) {
+ return createFactDistinctColumnsStep(jobId, false);
}
- public MapReduceExecutable createFactDistinctColumnsStepWithStats(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
- return createFactDistinctColumnsStep(flatHiveTableDesc, jobId, true);
+ public MapReduceExecutable createFactDistinctColumnsStepWithStats(String jobId) {
+ return createFactDistinctColumnsStep(jobId, true);
}
- private MapReduceExecutable createFactDistinctColumnsStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId, boolean withStats) {
+ private MapReduceExecutable createFactDistinctColumnsStep(String jobId, boolean withStats) {
MapReduceExecutable result = new MapReduceExecutable();
result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
result.setMapReduceJobClass(FactDistinctColumnsJob.class);
@@ -142,10 +141,6 @@ public class JobBuilderSupport {
}
}
- public String getFlatHiveTableLocation(CubeJoinedFlatTableDesc flatTableDesc, String jobId) {
- return getJobWorkingDir(jobId) + "/" + flatTableDesc.getTableName();
- }
-
public String getFactDistinctColumnsPath(String jobId) {
return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns";
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
new file mode 100644
index 0000000..57ec128
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.IBatchCubingEngine;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public class MRBatchCubingEngine2 implements IBatchCubingEngine {
+
+ @Override
+ public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
+ return new BatchCubingJobBuilder2(newSegment, submitter).build();
+ }
+
+ @Override
+ public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
+ return new BatchMergeJobBuilder2(mergeSegment, submitter).build();
+ }
+
+ @Override
+ public Class<?> getSourceInterface() {
+ return IMRInput.class;
+ }
+
+ @Override
+ public Class<?> getStorageInterface() {
+ return IMROutput2.class;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 099a614..4c44af7 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -6,6 +6,8 @@ import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.source.TableSourceFactory;
@@ -42,4 +44,12 @@ public class MRUtil {
return StorageFactory2.createEngineAdapter(seg.getCubeInstance(), IMROutput.class).getBatchMergeOutputSide(seg);
}
+ public static IMRBatchCubingOutputSide2 getBatchCubingOutputSide2(CubeSegment seg) {
+ return StorageFactory2.createEngineAdapter(seg.getCubeInstance(), IMROutput2.class).getBatchCubingOutputSide(seg);
+ }
+
+ public static IMRBatchMergeOutputSide2 getBatchMergeOutputSide2(CubeSegment seg) {
+ return StorageFactory2.createEngineAdapter(seg.getCubeInstance(), IMROutput2.class).getBatchMergeOutputSide(seg);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java
deleted file mode 100644
index e83db30..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr2/BatchCubingJobBuilder2.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr2;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.hadoop.cubev2.InMemCuboidJob;
-import org.apache.kylin.job.hadoop.cubev2.SaveStatisticsStep;
-
-public class BatchCubingJobBuilder2 extends JobBuilderSupport {
-
- private final IMRBatchCubingInputSide inputSide;
-
- public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
- super(newSegment, submitter);
- this.inputSide = MRUtil.getBatchCubingInputSide(seg);
- }
-
- public CubingJob build() {
- final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
- final String jobId = result.getId();
- final CubeJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
-
- // Phase 1: Create Flat Table
- inputSide.addStepPhase1_CreateFlatTable(result);
-
- // Phase 2: Build Dictionary
- result.addTask(createFactDistinctColumnsStepWithStats(flatHiveTableDesc, jobId));
- result.addTask(createBuildDictionaryStep(jobId));
-
- // Phase 3: Build Cube
- result.addTask(createSaveStatisticsStep(jobId)); //<<<<<
-
- // create htable step
- result.addTask(createCreateHTableStepWithStats(jobId)); //<<<<<
- result.addTask(createInMemCubingStep(flatHiveTableDesc, result.getId()));
- // bulk load step
- result.addTask(createBulkLoadStep(jobId)); //<<<<<
-
- // Phase 4: Update Metadata & Cleanup
- result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
- inputSide.addStepPhase4_Cleanup(result);
-
- return result;
- }
-
- private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
- SaveStatisticsStep result = new SaveStatisticsStep();
- result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
- result.setCubeName(seg.getCubeInstance().getName());
- result.setSegmentId(seg.getUuid());
- result.setStatisticsPath(getStatisticsPath(jobId));
- return result;
- }
-
- private MapReduceExecutable createInMemCubingStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
- // base cuboid job
- MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
-
- StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, seg);
-
- baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
-
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", getFlatHiveTableLocation(flatHiveTableDesc, jobId));
- appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
- appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
- appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "level", "0");
- appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-
- baseCuboidStep.setMapReduceParams(cmd.toString());
- baseCuboidStep.setMapReduceJobClass(InMemCuboidJob.class);
- baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
- return baseCuboidStep;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java
deleted file mode 100644
index 25ff082..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr2/BatchMergeJobBuilder2.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr2;
-
-import java.util.List;
-
-import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.hadoop.cubev2.MergeCuboidFromHBaseJob;
-import org.apache.kylin.job.hadoop.cubev2.MergeStatisticsStep;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class BatchMergeJobBuilder2 extends JobBuilderSupport {
-
- public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
- super(mergeSegment, submitter);
- }
-
- public CubingJob build() {
- final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
- final String jobId = result.getId();
-
- final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
- Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
- final List<String> mergingSegmentIds = Lists.newArrayList();
- final List<String> mergingCuboidPaths = Lists.newArrayList();
- final List<String> mergingHTables = Lists.newArrayList();
- for (CubeSegment merging : mergingSegments) {
- mergingSegmentIds.add(merging.getUuid());
- mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
- mergingHTables.add(merging.getStorageLocationIdentifier());
- }
-
- result.addTask(createMergeDictionaryStep(mergingSegmentIds));
-
- String mergedStatisticsFolder = getStatisticsPath(jobId);
- result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, mergedStatisticsFolder));
-
- // create htable step
- result.addTask(createCreateHTableStepWithStats(jobId)); //<<<<<
-
- String formattedTables = StringUtil.join(mergingHTables, ",");
- result.addTask(createMergeCuboidDataFromHBaseStep(formattedTables, jobId));
-
- // bulk load step
- result.addTask(createBulkLoadStep(jobId)); //<<<<<
-
- // update cube info
- result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
-
- result.addTask(createGarbageCollectionStep(mergingHTables, null)); //<<<<<
-
- return result;
- }
-
- private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
- MergeStatisticsStep result = new MergeStatisticsStep();
- result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
- result.setCubeName(seg.getCubeInstance().getName());
- result.setSegmentId(seg.getUuid());
- result.setMergingSegmentIds(mergingSegmentIds);
- result.setMergedStatisticsPath(mergedStatisticsFolder);
- return result;
- }
-
- private MapReduceExecutable createMergeCuboidDataFromHBaseStep(String inputTableNames, String jobId) {
- MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
- mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
- StringBuilder cmd = new StringBuilder();
-
- appendMapReduceParameters(cmd, seg);
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", inputTableNames);
- appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
- appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
- appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
-
- mergeCuboidDataStep.setMapReduceParams(cmd.toString());
- mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromHBaseJob.class);
- mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
- return mergeCuboidDataStep;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java b/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java
deleted file mode 100644
index aeddb9b..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr2/IMROutput2.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package org.apache.kylin.engine.mr2;
-
-public interface IMROutput2 {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java b/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java
deleted file mode 100644
index 8ec6f69..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr2/MRBatchCubingEngine2.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr2;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.IBatchCubingEngine;
-import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-
-public class MRBatchCubingEngine2 implements IBatchCubingEngine {
-
- @Override
- public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
- return new BatchCubingJobBuilder2(newSegment, submitter).build();
- }
-
- @Override
- public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
- return new BatchMergeJobBuilder2(mergeSegment, submitter).build();
- }
-
- @Override
- public Class<?> getSourceInterface() {
- return IMRInput.class;
- }
-
- @Override
- public Class<?> getStorageInterface() {
- return IMROutput2.class;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
index 87fc188..46bb9fc 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
@@ -19,6 +19,7 @@
package org.apache.kylin.job.hadoop.cube;
import org.apache.commons.cli.Options;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -32,11 +33,15 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidCLI;
import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +60,9 @@ public class CuboidJob extends AbstractHadoopJob {
@Override
public int run(String[] args) throws Exception {
+ if (this.mapperClass == null)
+ throw new Exception("Mapper class is not set!");
+
Options options = new Options();
try {
@@ -67,7 +75,6 @@ public class CuboidJob extends AbstractHadoopJob {
options.addOption(OPTION_INPUT_FORMAT);
parseOptions(options, args);
- Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
int nCuboidLevel = Integer.parseInt(getOptionValue(OPTION_NCUBOID_LEVEL));
@@ -79,26 +86,11 @@ public class CuboidJob extends AbstractHadoopJob {
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
logger.info("Starting: " + job.getJobName());
- FileInputFormat.setInputPaths(job, input);
setJobClasspath(job);
// Mapper
- if (this.mapperClass == null) {
- throw new Exception("Mapper class is not set!");
- }
-
- boolean isInputTextFormat = false;
- if (hasOption(OPTION_INPUT_FORMAT) && ("textinputformat".equalsIgnoreCase(getOptionValue(OPTION_INPUT_FORMAT)))) {
- isInputTextFormat = true;
- }
-
- if (isInputTextFormat) {
- job.setInputFormatClass(TextInputFormat.class);
-
- } else {
- job.setInputFormatClass(SequenceFileInputFormat.class);
- }
+ configureMapperInputFormat(cube.getSegment(segmentName, SegmentStatusEnum.NEW));
job.setMapperClass(this.mapperClass);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
@@ -130,6 +122,25 @@ public class CuboidJob extends AbstractHadoopJob {
}
}
+ private void configureMapperInputFormat(CubeSegment cubeSeg) throws IOException {
+ String input = getOptionValue(OPTION_INPUT_PATH);
+
+ if (StringUtils.isBlank(input)) {
+ // base cuboid case
+ IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+ flatTableInputFormat.configureJob(job);
+ }
+ else {
+ // n-dimension cuboid case
+ FileInputFormat.setInputPaths(job, new Path(input));
+ if (hasOption(OPTION_INPUT_FORMAT) && ("textinputformat".equalsIgnoreCase(getOptionValue(OPTION_INPUT_FORMAT)))) {
+ job.setInputFormatClass(TextInputFormat.class);
+ } else {
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ }
+ }
+ }
+
protected void setReduceTaskNum(Job job, KylinConfig config, String cubeName, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException {
Configuration jobConf = job.getConfiguration();
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java
index 599dde8..9fa1159 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java
@@ -19,25 +19,37 @@
package org.apache.kylin.job.hadoop.cube;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
-import org.apache.hadoop.io.Text;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.job.constant.BatchConstants;
/**
* @author George Song (ysong1)
*/
-public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, Text> {
+public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, Object> {
+
+ private IMRTableInputFormat flatTableInputFormat;
@Override
- public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
+ protected void setup(Context context) throws IOException {
+ super.setup(context);
+ flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
+ }
+
+ @Override
+ public void map(KEYIN key, Object value, Context context) throws IOException, InterruptedException {
counter++;
if (counter % BatchConstants.COUNTER_MAX == 0) {
logger.info("Handled " + counter + " records!");
}
+
try {
//put a record into the shared bytesSplitter
- bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter);
+ String[] row = flatTableInputFormat.parseMapperInput(value);
+ bytesSplitter.setBuffers(convertUTF8Bytes(row));
//take care of the data in bytesSplitter
outputKV(context);
@@ -46,4 +58,12 @@ public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, T
}
}
+ private byte[][] convertUTF8Bytes(String[] row) throws UnsupportedEncodingException {
+ byte[][] result = new byte[row.length][];
+ for (int i = 0; i < row.length; i++) {
+ result[i] = row[i].getBytes("UTF-8");
+ }
+ return result;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a82f39b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
new file mode 100644
index 0000000..63e5902
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMROutput;
+import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public class HBaseMROutput2 implements IMROutput2 {
+
+ @Override
+ public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(final CubeSegment seg) {
+ return new IMRBatchCubingOutputSide2() {
+ HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+ @Override
+ public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
+ result.addTask(createCreateHTableStepWithStats(jobId)); //<<<<<
+ }
+
+ @Override
+ public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+ result.addTask(createBulkLoadStep(jobId)); //<<<<<
+ }
+
+ @Override
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+ // nothing to do
+ }
+ };
+ }
+
+ @Override
+ public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(final CubeSegment seg) {
+ return new IMRBatchMergeOutputSide2() {
+ HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+ @Override
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+ result.addTask(createCreateHTableStepWithStats(jobId)); //<<<<<
+ }
+
+ @Override
+ public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow) {
+ result.addTask(createBulkLoadStep(jobId)); //<<<<<
+ }
+
+ @Override
+ public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
+ result.addTask(createGarbageCollectionStep(mergingHTables, null)); //<<<<<
+ }
+ };
+ }
+}