You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/12/25 14:49:18 UTC
[2/6] kylin git commit: spark cubing init commit
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index 4d89e1a..ce5888d 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -18,27 +18,27 @@
package org.apache.kylin.metadata.model;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.measure.MeasureTypeFactory;
-import org.apache.kylin.measure.basic.BasicMeasureType;
-import org.apache.kylin.metadata.datatype.DataType;
-
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.MeasureTypeFactory;
+import org.apache.kylin.measure.basic.BasicMeasureType;
+import org.apache.kylin.metadata.datatype.DataType;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
/**
*/
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class FunctionDesc {
+public class FunctionDesc implements Serializable {
public static FunctionDesc newInstance(String expression, ParameterDesc param, String returnType) {
FunctionDesc r = new FunctionDesc();
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java
index 6489244..dd1500b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java
@@ -18,17 +18,18 @@
package org.apache.kylin.metadata.model;
-import java.util.Arrays;
-
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.Arrays;
+
/**
*/
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class JoinDesc {
+public class JoinDesc implements Serializable {
// inner, left, right, outer...
@JsonProperty("type")
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java
index 5d0409a..51e5787 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java
@@ -25,8 +25,10 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonProperty;
+import java.io.Serializable;
+
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class JoinTableDesc {
+public class JoinTableDesc implements Serializable {
@JsonProperty("table")
private String table;
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
index a0b267d..c132d0e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
@@ -18,15 +18,16 @@
package org.apache.kylin.metadata.model;
+import com.google.common.base.Preconditions;
+
+import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import com.google.common.base.Preconditions;
-
-public class JoinsTree {
+public class JoinsTree implements Serializable {
final Map<String, Chain> tableChains = new LinkedHashMap<>();
@@ -111,7 +112,7 @@ public class JoinsTree {
return chain.join;
}
- static class Chain {
+ static class Chain implements java.io.Serializable {
TableRef table; // pk side
JoinDesc join;
Chain fkSide;
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
index 253b06b..c0719d2 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
@@ -18,18 +18,19 @@
package org.apache.kylin.metadata.model;
-import java.util.Objects;
-
import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
+import java.io.Serializable;
+import java.util.Objects;
+
/**
*/
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class MeasureDesc {
+public class MeasureDesc implements Serializable {
@JsonProperty("name")
private String name;
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ModelDimensionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ModelDimensionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ModelDimensionDesc.java
index 6460f71..3c5c5f1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ModelDimensionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ModelDimensionDesc.java
@@ -18,17 +18,17 @@
package org.apache.kylin.metadata.model;
-import java.util.List;
-
-import org.apache.kylin.common.util.StringUtil;
-
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kylin.common.util.StringUtil;
+
+import java.io.Serializable;
+import java.util.List;
/**
*/
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class ModelDimensionDesc {
+public class ModelDimensionDesc implements Serializable {
@JsonProperty("table")
private String table;
@JsonProperty("columns")
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
index c14d061..8ad20a8 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
@@ -18,21 +18,22 @@
package org.apache.kylin.metadata.model;
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-import java.util.List;
-
import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.List;
+
/**
*/
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class ParameterDesc {
+public class ParameterDesc implements Serializable {
public static ParameterDesc newInstance(Object... objs) {
if (objs.length == 0)
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
index 9925990..c6e6425 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
@@ -27,10 +27,12 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonProperty;
+import java.io.Serializable;
+
/**
*/
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class PartitionDesc {
+public class PartitionDesc implements Serializable {
public static enum PartitionType {
APPEND, //
@@ -175,7 +177,7 @@ public class PartitionDesc {
String buildDateRangeCondition(PartitionDesc partDesc, long startInclusive, long endExclusive);
}
- public static class DefaultPartitionConditionBuilder implements IPartitionConditionBuilder {
+ public static class DefaultPartitionConditionBuilder implements IPartitionConditionBuilder, java.io.Serializable {
@Override
public String buildDateRangeCondition(PartitionDesc partDesc, long startInclusive, long endExclusive) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
index 7089eba..0d9b442 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
@@ -18,25 +18,28 @@
package org.apache.kylin.metadata.model;
+import com.google.common.collect.Maps;
+
+import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
-import com.google.common.collect.Maps;
+public class TableRef implements Serializable {
-public class TableRef {
-
- final private DataModelDesc model;
+ final transient private DataModelDesc model;
final private String alias;
final private TableDesc table;
final private Map<String, TblColRef> columns;
+ final private String modelName;
TableRef(DataModelDesc model, String alias, TableDesc table) {
this.model = model;
+ this.modelName = model.getName();
this.alias = alias;
this.table = table;
this.columns = Maps.newLinkedHashMap();
-
+
for (ColumnDesc col : table.getColumns()) {
columns.put(col.getName(), new TblColRef(this, col));
}
@@ -94,7 +97,7 @@ public class TableRef {
TableRef t = (TableRef) o;
- if ((model == null ? t.model == null : model.getName().equals(t.model.getName())) == false)
+ if ((modelName == null ? t.modelName != null : modelName.equals(t.modelName)) == false)
return false;
if ((alias == null ? t.alias == null : alias.equals(t.alias)) == false)
return false;
@@ -107,7 +110,7 @@ public class TableRef {
@Override
public int hashCode() {
int result = 0;
- result = 31 * result + model.getName().hashCode();
+ result = 31 * result + modelName.hashCode();
result = 31 * result + alias.hashCode();
result = 31 * result + table.getIdentity().hashCode();
return result;
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 6eba3c2..0da2a8e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -32,6 +32,7 @@ import org.apache.kylin.engine.mr.steps.NDCuboidJob;
import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,7 +77,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
return result;
}
- private void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
+ protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
RowKeyDesc rowKeyDesc = ((CubeSegment) seg).getCubeDesc().getRowkey();
final int groupRowkeyColumnsCount = ((CubeSegment) seg).getCubeDesc().getBuildLevel();
final int totalRowkeyColumnsCount = rowKeyDesc.getRowKeyColumns().length;
@@ -100,7 +101,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
return result;
}
- private MapReduceExecutable createInMemCubingStep(String jobId, String cuboidRootPath) {
+ protected AbstractExecutable createInMemCubingStep(String jobId, String cuboidRootPath) {
// base cuboid job
MapReduceExecutable cubeStep = new MapReduceExecutable();
@@ -117,7 +118,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
cubeStep.setMapReduceParams(cmd.toString());
cubeStep.setMapReduceJobClass(getInMemCuboidJob());
- cubeStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
+// cubeStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
return cubeStep;
}
@@ -144,7 +145,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
baseCuboidStep.setMapReduceParams(cmd.toString());
baseCuboidStep.setMapReduceJobClass(getBaseCuboidJob());
- baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
+// baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
return baseCuboidStep;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 5f5814b..47695b8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -75,6 +75,7 @@ public class JobBuilderSupport {
appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step");
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
result.setMapReduceParams(cmd.toString());
+ result.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
return result;
}
@@ -192,7 +193,7 @@ public class JobBuilderSupport {
return buf.append(" -").append(paraName).append(" ").append(paraValue);
}
- public String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
+ public static String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
String[] paths = new String[groupRowkeyColumnsCount + 1];
for (int i = 0; i <= groupRowkeyColumnsCount; i++) {
int dimNum = totalRowkeyColumnCount - i;
@@ -205,4 +206,13 @@ public class JobBuilderSupport {
return paths;
}
+ public static String getCuboidOutputPathsByLevel(String cuboidRootPath, int level) {
+ if (level == 0) {
+ return cuboidRootPath + "base_cuboid";
+ } else {
+ return cuboidRootPath + level + "level_cuboid";
+ }
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
new file mode 100644
index 0000000..8d8496c
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import com.google.common.collect.Sets;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.ParameterDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ */
+public class BaseCuboidBuilder implements java.io.Serializable {
+
+ protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidBuilder.class);
+ public static final String HIVE_NULL = "\\N";
+ public static final byte[] ONE = Bytes.toBytes("1");
+ protected String cubeName;
+ protected Cuboid baseCuboid;
+ protected CubeDesc cubeDesc;
+ protected CubeSegment cubeSegment;
+ protected Set<String> nullStrs;
+ protected CubeJoinedFlatTableEnrich intermediateTableDesc;
+ protected MeasureIngester<?>[] aggrIngesters;
+ protected Map<TblColRef, Dictionary<String>> dictionaryMap;
+ protected AbstractRowKeyEncoder rowKeyEncoder;
+ protected BufferedMeasureCodec measureCodec;
+
+ protected KylinConfig kylinConfig;
+
+ public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment, CubeJoinedFlatTableEnrich intermediateTableDesc,
+ AbstractRowKeyEncoder rowKeyEncoder, BufferedMeasureCodec measureCodec, MeasureIngester<?>[] aggrIngesters, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+ this.kylinConfig = kylinConfig;
+ this.cubeDesc = cubeDesc;
+ this.cubeSegment = cubeSegment;
+ this.intermediateTableDesc = intermediateTableDesc;
+ this.rowKeyEncoder = rowKeyEncoder;
+ this.measureCodec = measureCodec;
+ this.aggrIngesters = aggrIngesters;
+ this.dictionaryMap = dictionaryMap;
+
+ init();
+ }
+
+ public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment, CubeJoinedFlatTableEnrich intermediateTableDesc) {
+ this.kylinConfig = kylinConfig;
+ this.cubeDesc = cubeDesc;
+ this.cubeSegment = cubeSegment;
+ this.intermediateTableDesc = intermediateTableDesc;
+
+ rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
+ measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
+ aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
+ dictionaryMap = cubeSegment.buildDictionaryMap();
+
+ init();
+ }
+
+ private void init() {
+ long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
+ initNullBytes();
+ }
+
+ private void initNullBytes() {
+ nullStrs = Sets.newHashSet();
+ nullStrs.add(HIVE_NULL);
+ String[] nullStrings = cubeDesc.getNullStrings();
+ if (nullStrings != null) {
+ for (String s : nullStrings) {
+ nullStrs.add(s);
+ }
+ }
+ }
+
+ protected boolean isNull(String v) {
+ return nullStrs.contains(v);
+ }
+
+ public byte[] buildKey(String[] flatRow) {
+ int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
+ List<TblColRef> columns = baseCuboid.getColumns();
+ String[] colValues = new String[columns.size()];
+ for (int i = 0; i < columns.size(); i++) {
+ colValues[i] = getCell(rowKeyColumnIndexes[i], flatRow);
+ }
+ return rowKeyEncoder.encode(colValues);
+ }
+
+ public ByteBuffer buildValue(String[] flatRow) {
+ return measureCodec.encode(buildValueObjects(flatRow));
+ }
+
+ public Object[] buildValueObjects(String[] flatRow) {
+ Object[] measures = new Object[cubeDesc.getMeasures().size()];
+ for (int i = 0; i < measures.length; i++) {
+ measures[i] = buildValueOf(i, flatRow);
+ }
+
+ return measures;
+ }
+
+ public void resetAggrs() {
+ for (int i = 0; i < cubeDesc.getMeasures().size(); i++) {
+ aggrIngesters[i].reset();
+ }
+ }
+
+ private Object buildValueOf(int idxOfMeasure, String[] flatRow) {
+ MeasureDesc measure = cubeDesc.getMeasures().get(idxOfMeasure);
+ FunctionDesc function = measure.getFunction();
+ int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure];
+
+ int paramCount = function.getParameterCount();
+ String[] inputToMeasure = new String[paramCount];
+
+ // pick up parameter values
+ ParameterDesc param = function.getParameter();
+ int colParamIdx = 0; // index among parameters of column type
+ for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) {
+ String value;
+ if (function.isCount()) {
+ value = "1";
+ } else if (param.isColumnType()) {
+ value = getCell(colIdxOnFlatTable[colParamIdx++], flatRow);
+ } else {
+ value = param.getValue();
+ }
+ inputToMeasure[i] = value;
+ }
+
+ return aggrIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap);
+ }
+
+ private String getCell(int i, String[] flatRow) {
+ if (isNull(flatRow[i]))
+ return null;
+ else
+ return flatRow[i];
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java
new file mode 100644
index 0000000..4e98618
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.kv.RowKeyEncoder;
+import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ */
+public class NDCuboidBuilder implements Serializable {
+
+ protected static final Logger logger = LoggerFactory.getLogger(NDCuboidBuilder.class);
+ protected String cubeName;
+ protected String segmentID;
+ protected CubeSegment cubeSegment;
+ private RowKeySplitter rowKeySplitter;
+ private RowKeyEncoderProvider rowKeyEncoderProvider;
+ private byte[] newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];
+ private ByteArray newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
+
+ public NDCuboidBuilder(CubeSegment cubeSegment) {
+ this.cubeSegment = cubeSegment;
+ this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256);
+ this.rowKeyEncoderProvider = new RowKeyEncoderProvider(cubeSegment);
+ }
+
+ public NDCuboidBuilder(CubeSegment cubeSegment, RowKeyEncoderProvider rowKeyEncoderProvider) {
+ this.cubeSegment = cubeSegment;
+ this.rowKeyEncoderProvider = rowKeyEncoderProvider;
+ this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256);
+ }
+
+
+ public Pair<Integer, ByteArray> buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) {
+ RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(childCuboid);
+
+ int offset = 0;
+
+ // rowkey columns
+ long mask = Long.highestOneBit(parentCuboid.getId());
+ long parentCuboidId = parentCuboid.getId();
+ long childCuboidId = childCuboid.getId();
+ long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId());
+ int index = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboidId
+ for (int i = 0; i < parentCuboidIdActualLength; i++) {
+ if ((mask & parentCuboidId) > 0) {// if the this bit position equals
+ // 1
+ if ((mask & childCuboidId) > 0) {// if the child cuboid has this
+ // column
+ System.arraycopy(splitBuffers[index].value, 0, newKeyBodyBuf, offset, splitBuffers[index].length);
+ offset += splitBuffers[index].length;
+ }
+ index++;
+ }
+ mask = mask >> 1;
+ }
+
+ int fullKeySize = rowkeyEncoder.getBytesLength();
+ while (newKeyBuf.array().length < fullKeySize) {
+ newKeyBuf.set(new byte[newKeyBuf.length() * 2]);
+ }
+ newKeyBuf.set(0, fullKeySize);
+
+ rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, offset), newKeyBuf);
+
+ return new Pair<>(Integer.valueOf(fullKeySize), newKeyBuf);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index 7b719e0..d08e29a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -18,38 +18,25 @@
package org.apache.kylin.engine.mr.steps;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.hadoop.io.Text;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BaseCuboidBuilder;
import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.measure.BufferedMeasureCodec;
-import org.apache.kylin.measure.MeasureIngester;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
/**
*/
@@ -59,131 +46,37 @@ abstract public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
public static final byte[] ONE = Bytes.toBytes("1");
protected String cubeName;
protected String segmentID;
- protected Cuboid baseCuboid;
protected CubeInstance cube;
protected CubeDesc cubeDesc;
protected CubeSegment cubeSegment;
- protected Set<String> nullStrs;
- protected CubeJoinedFlatTableEnrich intermediateTableDesc;
- protected String intermediateTableRowDelimiter;
- protected byte byteRowDelimiter;
protected int counter;
- protected MeasureIngester<?>[] aggrIngesters;
- protected Map<TblColRef, Dictionary<String>> dictionaryMap;
protected Object[] measures;
- protected AbstractRowKeyEncoder rowKeyEncoder;
- protected BufferedMeasureCodec measureCodec;
private int errorRecordCounter;
protected Text outputKey = new Text();
protected Text outputValue = new Text();
+ private BaseCuboidBuilder baseCuboidBuilder;
+
@Override
protected void setup(Context context) throws IOException {
super.bindCurrentConfiguration(context.getConfiguration());
-
cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
- intermediateTableRowDelimiter = context.getConfiguration().get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER));
- if (Bytes.toBytes(intermediateTableRowDelimiter).length > 1) {
- throw new RuntimeException("Expected delimiter byte length is 1, but got " + Bytes.toBytes(intermediateTableRowDelimiter).length);
- }
-
- byteRowDelimiter = Bytes.toBytes(intermediateTableRowDelimiter)[0];
-
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
- cube = CubeManager.getInstance(config).getCube(cubeName);
+ final KylinConfig kylinConfig = AbstractHadoopJob.loadKylinPropsAndMetadata();
+ cube = CubeManager.getInstance(kylinConfig).getCube(cubeName);
cubeDesc = cube.getDescriptor();
cubeSegment = cube.getSegmentById(segmentID);
+ CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
+ baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc);
- long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-
- intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
-
- rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
-
- measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
- measures = new Object[cubeDesc.getMeasures().size()];
-
- aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
- dictionaryMap = cubeSegment.buildDictionaryMap();
-
- initNullBytes();
- }
-
- private void initNullBytes() {
- nullStrs = Sets.newHashSet();
- nullStrs.add(HIVE_NULL);
- String[] nullStrings = cubeDesc.getNullStrings();
- if (nullStrings != null) {
- for (String s : nullStrings) {
- nullStrs.add(s);
- }
- }
- }
-
- protected boolean isNull(String v) {
- return nullStrs.contains(v);
- }
-
- protected byte[] buildKey(String[] flatRow) {
- int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
- List<TblColRef> columns = baseCuboid.getColumns();
- String[] colValues = new String[columns.size()];
- for (int i = 0; i < columns.size(); i++) {
- colValues[i] = getCell(rowKeyColumnIndexes[i], flatRow);
- }
- return rowKeyEncoder.encode(colValues);
}
- private ByteBuffer buildValue(String[] flatRow) {
-
- for (int i = 0; i < measures.length; i++) {
- measures[i] = buildValueOf(i, flatRow);
- }
-
- return measureCodec.encode(measures);
- }
-
- private Object buildValueOf(int idxOfMeasure, String[] flatRow) {
- MeasureDesc measure = cubeDesc.getMeasures().get(idxOfMeasure);
- FunctionDesc function = measure.getFunction();
- int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure];
-
- int paramCount = function.getParameterCount();
- String[] inputToMeasure = new String[paramCount];
-
- // pick up parameter values
- ParameterDesc param = function.getParameter();
- int colParamIdx = 0; // index among parameters of column type
- for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) {
- String value;
- if (function.isCount()) {
- value = "1";
- } else if (param.isColumnType()) {
- value = getCell(colIdxOnFlatTable[colParamIdx++], flatRow);
- } else {
- value = param.getValue();
- }
- inputToMeasure[i] = value;
- }
-
- return aggrIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap);
- }
-
- private String getCell(int i, String[] flatRow) {
- if (isNull(flatRow[i]))
- return null;
- else
- return flatRow[i];
- }
protected void outputKV(String[] flatRow, Context context) throws IOException, InterruptedException {
- byte[] rowKey = buildKey(flatRow);
+ byte[] rowKey = baseCuboidBuilder.buildKey(flatRow);
outputKey.set(rowKey, 0, rowKey.length);
- ByteBuffer valueBuf = buildValue(flatRow);
+ ByteBuffer valueBuf = baseCuboidBuilder.buildValue(flatRow);
outputValue.set(valueBuf.array(), 0, valueBuf.position());
context.write(outputKey, outputValue);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index 01cdd4a..b924edc 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -18,29 +18,27 @@
package org.apache.kylin.engine.mr.steps;
-import java.io.IOException;
-import java.util.Collection;
-
import org.apache.hadoop.io.Text;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.common.RowKeySplitter;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.kv.RowKeyEncoder;
-import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.NDCuboidBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Collection;
+
/**
* @author George Song (ysong1)
*
@@ -59,10 +57,9 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
private int handleCounter;
private int skipCounter;
- private byte[] newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];
- private ByteArray newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
private RowKeySplitter rowKeySplitter;
- private RowKeyEncoderProvider rowKeyEncoderProvider;
+
+ private NDCuboidBuilder ndCuboidBuilder;
@Override
protected void setup(Context context) throws IOException {
@@ -76,48 +73,13 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
cubeSegment = cube.getSegmentById(segmentID);
cubeDesc = cube.getDescriptor();
-
+ ndCuboidBuilder = new NDCuboidBuilder(cubeSegment);
// initialize CubiodScheduler
cuboidScheduler = new CuboidScheduler(cubeDesc);
-
rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256);
- rowKeyEncoderProvider = new RowKeyEncoderProvider(cubeSegment);
}
- private int buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) {
- RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(childCuboid);
-
- int offset = 0;
-
- // rowkey columns
- long mask = Long.highestOneBit(parentCuboid.getId());
- long parentCuboidId = parentCuboid.getId();
- long childCuboidId = childCuboid.getId();
- long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId());
- int index = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboidId
- for (int i = 0; i < parentCuboidIdActualLength; i++) {
- if ((mask & parentCuboidId) > 0) {// if the this bit position equals
- // 1
- if ((mask & childCuboidId) > 0) {// if the child cuboid has this
- // column
- System.arraycopy(splitBuffers[index].value, 0, newKeyBodyBuf, offset, splitBuffers[index].length);
- offset += splitBuffers[index].length;
- }
- index++;
- }
- mask = mask >> 1;
- }
- int fullKeySize = rowkeyEncoder.getBytesLength();
- while (newKeyBuf.array().length < fullKeySize) {
- newKeyBuf.set(new byte[newKeyBuf.length() * 2]);
- }
- newKeyBuf.set(0, fullKeySize);
-
- rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, offset), newKeyBuf);
-
- return fullKeySize;
- }
@Override
public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
@@ -143,8 +105,8 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
for (Long child : myChildren) {
Cuboid childCuboid = Cuboid.findById(cubeDesc, child);
- int fullKeySize = buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers());
- outputKey.set(newKeyBuf.array(), 0, fullKeySize);
+ Pair<Integer, ByteArray> result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers());
+ outputKey.set(result.getSecond().array(), 0, result.getFirst());
context.write(outputKey, value);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-spark/pom.xml
----------------------------------------------------------------------
diff --git a/engine-spark/pom.xml b/engine-spark/pom.xml
index a7cffdd..264f4c9 100644
--- a/engine-spark/pom.xml
+++ b/engine-spark/pom.xml
@@ -47,6 +47,11 @@
<artifactId>kylin-core-job</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-engine-mr</artifactId>
+ </dependency>
+
<!-- Spark dependency -->
<dependency>
<groupId>org.apache.spark</groupId>
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
new file mode 100644
index 0000000..a7a4151
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.MRBatchCubingEngine2;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+/**
+ */
+public class SparkBatchCubingEngine2 extends MRBatchCubingEngine2 {
+ @Override
+ public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
+ return new SparkBatchCubingJobBuilder2(newSegment, submitter).build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
new file mode 100644
index 0000000..57b6432
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.hadoop.util.ClassUtil;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
+
+ private static final Logger logger = LoggerFactory.getLogger(SparkBatchCubingJobBuilder2.class);
+
+ public SparkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
+ super(newSegment, submitter);
+ }
+
+ protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
+
+ }
+
+ @Override
+ protected AbstractExecutable createInMemCubingStep(String jobId, String cuboidRootPath) {
+ IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);
+ final SparkExecutable sparkExecutable = new SparkExecutable();
+ sparkExecutable.setClassName(SparkCubingV3.class.getName());
+ sparkExecutable.setParam(SparkCubingV3.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+ sparkExecutable.setParam(SparkCubingV3.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+ sparkExecutable.setParam(SparkCubingV3.OPTION_INPUT_PATH.getOpt(), flatTableDesc.getTableName());
+ sparkExecutable.setParam(SparkCubingV3.OPTION_CONF_PATH.getOpt(), "/Users/shishaofeng/workspace/kylin-15/examples/test_case_data/sandbox/"); //FIXME
+ sparkExecutable.setParam(SparkCubingV3.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath);
+
+ StringBuilder jars = new StringBuilder();
+
+ StringUtil.appendWithSeparator(jars, findJar("org.htrace.HTraceConfiguration")); // htrace-core.jar
+ StringUtil.appendWithSeparator(jars, findJar("org.cloudera.htrace.HTraceConfiguration"));
+ StringUtil.appendWithSeparator(jars, findJar("org.apache.hadoop.hbase.client.HConnection")); // hbase-client.jar
+ StringUtil.appendWithSeparator(jars, findJar("org.apache.hadoop.hbase.HBaseConfiguration")); // hbase-common.jar
+ StringUtil.appendWithSeparator(jars, findJar("org.apache.hadoop.hbase.util.ByteStringer")); // hbase-protocol.jar
+
+ StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
+ sparkExecutable.setJars(jars.toString());
+ // sparkExecutable.setJars("/Users/shishaofeng/.m2/repository/org/cloudera/htrace/htrace-core/2.01/htrace-core-2.01.jar,/Users/shishaofeng/.m2/repository/org/apache/hbase/hbase-protocol/0.98.8-hadoop2/hbase-protocol-0.98.8-hadoop2.jar,/Users/shishaofeng/.m2/repository/org/apache/hbase/hbase-common/0.98.8-hadoop2/hbase-common-0.98.8-hadoop2.jar,/Users/shishaofeng/.m2//repository/org/apache/hbase/hbase-client/0.98.8-hadoop2/hbase-client-0.98.8-hadoop2.jar");
+
+ sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE + " with Spark");
+ return sparkExecutable;
+
+ }
+
+ private String findJar(String className) {
+ try {
+ return ClassUtil.findContainingJar(Class.forName(className));
+ } catch (ClassNotFoundException e) {
+ logger.error("failed to locate jar for class " + className, e);
+ }
+
+ return "";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 6e894dd..4dd2276 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -84,6 +84,7 @@ import org.apache.kylin.engine.spark.util.IteratorUtils;
import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.MeasureAggregators;
import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.measure.MeasureIngester;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
@@ -153,6 +154,20 @@ public class SparkCubing extends AbstractApplication {
return options;
}
+ public static KylinConfig loadKylinPropsAndMetadata(String folder) throws IOException {
+ File metaDir = new File(folder);
+ if (!metaDir.getAbsolutePath().equals(System.getProperty(KylinConfig.KYLIN_CONF))) {
+ System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath());
+ logger.info("The absolute path for meta dir is " + metaDir.getAbsolutePath());
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ System.out.println("setting metadataUrl to " + metaDir.getAbsolutePath());
+ kylinConfig.setMetadataUrl(metaDir.getAbsolutePath());
+ return kylinConfig;
+ } else {
+ return KylinConfig.getInstanceFromEnv();
+ }
+ }
+
private void setupClasspath(JavaSparkContext sc, String confPath) throws Exception {
ClassUtil.addClasspath(confPath);
final File[] files = new File(confPath).listFiles(new FileFilter() {
@@ -462,7 +477,7 @@ public class SparkCubing extends AbstractApplication {
}).saveAsNewAPIHadoopFile(hFileLocation, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat.class, conf);
}
- private static void prepare() throws Exception {
+ public static void prepare() throws Exception {
final File file = new File(SparkFiles.get("kylin.properties"));
final String confPath = file.getParentFile().getAbsolutePath();
System.out.println("conf directory:" + confPath);
@@ -526,12 +541,18 @@ public class SparkCubing extends AbstractApplication {
}
}
- private Collection<String> getKyroClasses() {
+ public static Collection<String> getKyroClasses() {
Set<Class> kyroClasses = Sets.newHashSet();
kyroClasses.addAll(new Reflections("org.apache.kylin").getSubTypesOf(Serializable.class));
+ kyroClasses.addAll(new Reflections("org.apache.kylin.dimension").getSubTypesOf(Serializable.class));
+ kyroClasses.addAll(new Reflections("org.apache.kylin.cube").getSubTypesOf(Serializable.class));
kyroClasses.addAll(new Reflections("org.apache.kylin.cube.model").getSubTypesOf(Object.class));
+ kyroClasses.addAll(new Reflections("org.apache.kylin.metadata").getSubTypesOf(Object.class));
kyroClasses.addAll(new Reflections("org.apache.kylin.metadata.model").getSubTypesOf(Object.class));
kyroClasses.addAll(new Reflections("org.apache.kylin.metadata.measure").getSubTypesOf(Object.class));
+ kyroClasses.addAll(new Reflections("org.apache.kylin.metadata.datatype").getSubTypesOf(org.apache.kylin.common.util.BytesSerializer.class));
+ kyroClasses.addAll(new Reflections("org.apache.kylin.measure").getSubTypesOf(MeasureIngester.class));
+
kyroClasses.add(HashMap.class);
kyroClasses.add(org.apache.spark.sql.Row[].class);
kyroClasses.add(org.apache.spark.sql.Row.class);
@@ -541,11 +562,15 @@ public class SparkCubing extends AbstractApplication {
kyroClasses.add(org.apache.spark.sql.types.StructField.class);
kyroClasses.add(org.apache.spark.sql.types.DateType$.class);
kyroClasses.add(org.apache.spark.sql.types.Metadata.class);
- kyroClasses.add(Object[].class);
kyroClasses.add(org.apache.spark.sql.types.StringType$.class);
kyroClasses.add(Hashing.murmur3_128().getClass());
- kyroClasses.add(org.apache.spark.sql.columnar.CachedBatch.class);
+ kyroClasses.add(org.apache.spark.sql.execution.columnar.CachedBatch.class);
+ kyroClasses.add(Object[].class);
+ kyroClasses.add(int[].class);
+ kyroClasses.add(byte[].class);
kyroClasses.add(byte[][].class);
+ kyroClasses.add(String[].class);
+ kyroClasses.add(String[][].class);
kyroClasses.add(org.apache.spark.sql.types.Decimal.class);
kyroClasses.add(scala.math.BigDecimal.class);
kyroClasses.add(java.math.BigDecimal.class);
@@ -553,6 +578,52 @@ public class SparkCubing extends AbstractApplication {
kyroClasses.add(java.math.RoundingMode.class);
kyroClasses.add(java.util.ArrayList.class);
kyroClasses.add(java.util.LinkedList.class);
+ kyroClasses.add(java.util.HashSet.class);
+ kyroClasses.add(java.util.LinkedHashSet.class);
+ kyroClasses.add(java.util.LinkedHashMap.class);
+ kyroClasses.add(java.util.concurrent.ConcurrentHashMap.class);
+
+ kyroClasses.add(java.util.HashMap.class);
+ kyroClasses.add(java.util.Properties.class);
+ kyroClasses.add(org.apache.kylin.metadata.model.ColumnDesc[].class);
+ kyroClasses.add(org.apache.kylin.metadata.model.JoinTableDesc[].class);
+ kyroClasses.add(org.apache.kylin.metadata.model.TblColRef[].class);
+ kyroClasses.add(org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity.class);
+ kyroClasses.add(org.apache.kylin.metadata.model.DataModelDesc.TableKind.class);
+ kyroClasses.add(org.apache.kylin.metadata.model.PartitionDesc.DefaultPartitionConditionBuilder.class);
+ kyroClasses.add(org.apache.kylin.metadata.model.PartitionDesc.PartitionType.class);
+ kyroClasses.add(org.apache.kylin.cube.model.CubeDesc.DeriveInfo.class);
+ kyroClasses.add(org.apache.kylin.cube.model.CubeDesc.DeriveType.class);
+ kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnFamilyDesc[].class);
+ kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnDesc[].class);
+ kyroClasses.add(org.apache.kylin.metadata.model.MeasureDesc[].class);
+ kyroClasses.add(org.apache.kylin.cube.model.RowKeyColDesc[].class);
+ kyroClasses.add(org.apache.kylin.common.util.Array.class);
+ kyroClasses.add(org.apache.kylin.metadata.model.Segments.class);
+ kyroClasses.add(org.apache.kylin.metadata.realization.RealizationStatusEnum.class);
+ kyroClasses.add(org.apache.kylin.metadata.model.SegmentStatusEnum.class);
+ kyroClasses.add(org.apache.kylin.measure.BufferedMeasureCodec.class);
+ kyroClasses.add(org.apache.kylin.cube.kv.RowKeyColumnIO.class);
+ kyroClasses.add(org.apache.kylin.measure.MeasureCodec.class);
+ kyroClasses.add(org.apache.kylin.measure.MeasureAggregator[].class);
+ kyroClasses.add(org.apache.kylin.metadata.datatype.DataTypeSerializer[].class);
+ kyroClasses.add(org.apache.kylin.cube.kv.CubeDimEncMap.class);
+ kyroClasses.add(org.apache.kylin.measure.basic.BasicMeasureType.class);
+ kyroClasses.add(org.apache.kylin.common.util.SplittedBytes[].class);
+ kyroClasses.add(org.apache.kylin.common.util.SplittedBytes.class);
+ kyroClasses.add(org.apache.kylin.cube.kv.RowKeyEncoderProvider.class);
+ kyroClasses.add(org.apache.kylin.cube.kv.RowKeyEncoder.class);
+ kyroClasses.add(org.apache.kylin.measure.basic.BigDecimalIngester.class);
+ kyroClasses.add(org.apache.kylin.dimension.DictionaryDimEnc.class);
+ kyroClasses.add(org.apache.kylin.dimension.IntDimEnc.class);
+ kyroClasses.add(org.apache.kylin.dimension.BooleanDimEnc.class);
+ kyroClasses.add(org.apache.kylin.dimension.DateDimEnc.class);
+ kyroClasses.add(org.apache.kylin.dimension.FixedLenDimEnc.class);
+ kyroClasses.add(org.apache.kylin.dimension.FixedLenHexDimEnc.class);
+ kyroClasses.add(org.apache.kylin.dimension.IntegerDimEnc.class);
+ kyroClasses.add(org.apache.kylin.dimension.OneMoreByteVLongDimEnc.class);
+ kyroClasses.add(org.apache.kylin.dimension.TimeDimEnc.class);
+ kyroClasses.add(org.apache.kylin.cube.model.AggregationGroup.HierarchyMask.class);
ArrayList<String> result = Lists.newArrayList();
for (Class kyroClass : kyroClasses) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingV3.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingV3.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingV3.java
new file mode 100644
index 0000000..6f2915a
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingV3.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.spark;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
+import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.common.BaseCuboidBuilder;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.NDCuboidBuilder;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkFiles;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.hive.HiveContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.kylin.engine.spark.SparkCubing.getKyroClasses;
+
+/**
+ */
+public class SparkCubingV3 extends AbstractApplication implements Serializable {
+
+ protected static final Logger logger = LoggerFactory.getLogger(SparkCubing.class);
+
+ public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable");
+ public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+ public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId");
+ public static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath");
+ public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
+
+ private Options options;
+
+ public SparkCubingV3() {
+ options = new Options();
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_CONF_PATH);
+ options.addOption(OPTION_OUTPUT_PATH);
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ private void setupClasspath(JavaSparkContext sc, String confPath) throws Exception {
+ ClassUtil.addClasspath(confPath);
+ final File[] files = new File(confPath).listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File pathname) {
+ if (pathname.getAbsolutePath().endsWith(".xml")) {
+ return true;
+ }
+ if (pathname.getAbsolutePath().endsWith(".properties")) {
+ return true;
+ }
+ return false;
+ }
+ });
+ for (File file : files) {
+ sc.addFile(file.getAbsolutePath());
+ }
+ }
+
+
+ private static final void prepare() {
+ final File file = new File(SparkFiles.get("kylin.properties"));
+ final String confPath = file.getParentFile().getAbsolutePath();
+ System.out.println("conf directory:" + confPath);
+ System.setProperty(KylinConfig.KYLIN_CONF, confPath);
+ ClassUtil.addClasspath(confPath);
+ }
+
+
+ @Override
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+ final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+ final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+ final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH);
+ final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+
+ SparkConf conf = new SparkConf().setAppName("Cubing Application");
+ //serialization conf
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.set("spark.kryo.registrationRequired", "true");
+ final Iterable<String> allClasses = Iterables.filter(Iterables.concat(Lists.newArrayList(conf.get("spark.kryo.classesToRegister", "").split(",")), getKyroClasses()), new Predicate<String>() {
+ @Override
+ public boolean apply(@Nullable String input) {
+ return input != null && input.trim().length() > 0;
+ }
+ });
+ conf.set("spark.kryo.classesToRegister", StringUtils.join(allClasses, ","));
+
+ JavaSparkContext sc = new JavaSparkContext(conf);
+ setupClasspath(sc, confPath);
+ HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+
+ HiveContext sqlContext = new HiveContext(sc.sc());
+ final DataFrame intermediateTable = sqlContext.sql("select * from " + hiveTable);
+
+ System.setProperty(KylinConfig.KYLIN_CONF, confPath);
+ final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+ final CubeDesc cubeDesc = cubeInstance.getDescriptor();
+ final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+ final CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
+
+ final Broadcast<CubeDesc> vCubeDesc = sc.broadcast(cubeDesc);
+ final Broadcast<CubeSegment> vCubeSegment = sc.broadcast(cubeSegment);
+ final Broadcast<BufferedMeasureCodec> vCodec = sc.broadcast(new BufferedMeasureCodec(cubeDesc.getMeasures()));
+ NDCuboidBuilder ndCuboidBuilder = new NDCuboidBuilder(vCubeSegment.getValue(), new RowKeyEncoderProvider(cubeSegment));
+
+ final Broadcast<NDCuboidBuilder> vNDCuboidBuilder = sc.broadcast(ndCuboidBuilder);
+ final Broadcast<CuboidScheduler> vCuboidScheduler = sc.broadcast(new CuboidScheduler(vCubeDesc.getValue()));
+
+ final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ final Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
+ final int measureNum = cubeDesc.getMeasures().size();
+ final BaseCuboidBuilder baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, vCubeDesc.getValue(), vCubeSegment.getValue(), intermediateTableDesc,
+ AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid),
+ vCodec.getValue(), MeasureIngester.create(cubeDesc.getMeasures()), cubeSegment.buildDictionaryMap());
+
+ boolean[] needAggr = new boolean[cubeDesc.getMeasures().size()];
+ boolean allNormalMeasure = true;
+ for (int i = 0; i < cubeDesc.getMeasures().size(); i++) {
+ needAggr[i] = !cubeDesc.getMeasures().get(i).getFunction().getMeasureType().onlyAggrInBaseCuboid();
+ allNormalMeasure = allNormalMeasure && needAggr[i];
+ }
+ logger.info("All measure are normal (agg on all cuboids) ? : " + allNormalMeasure);
+
+ // encode with dimension encoding, transform to <byte[], Object[]> RDD
+ final JavaPairRDD<byte[], Object[]> encodedBaseRDD = intermediateTable.javaRDD().mapToPair(new PairFunction<Row, byte[], Object[]>() {
+ @Override
+ public Tuple2<byte[], Object[]> call(Row row) throws Exception {
+ String[] rowArray = rowToArray(row);
+ baseCuboidBuilder.resetAggrs();
+ byte[] rowKey = baseCuboidBuilder.buildKey(rowArray);
+ Object[] result = baseCuboidBuilder.buildValueObjects(rowArray);
+ return new Tuple2<>(rowKey, result);
+ }
+
+ private String[] rowToArray(Row row) {
+ String[] result = new String[row.size()];
+ for (int i = 0; i < row.size(); i++) {
+ final Object o = row.get(i);
+ if (o != null) {
+ result[i] = o.toString();
+ } else {
+ result[i] = null;
+ }
+ }
+ return result;
+ }
+
+ });
+
+
+ final CuboidReducerFunction2 reducerFunction2 = new CuboidReducerFunction2(measureNum, vCubeDesc.getValue(), vCodec.getValue());
+ CuboidReducerFunction2 baseCuboidReducerFunction = reducerFunction2;
+ if (allNormalMeasure == false) {
+ baseCuboidReducerFunction = new BaseCuboidReducerFunction2(measureNum, vCubeDesc.getValue(), vCodec.getValue(), needAggr);
+ }
+
+ // aggregate to calculate base cuboid
+ final JavaPairRDD<byte[], Object[]> baseCuboidRDD = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction);
+ persistent(baseCuboidRDD, vCodec.getValue(), outputPath, 0, sc.hadoopConfiguration());
+
+ // aggregate to ND cuboids
+ final int totalLevels = cubeDesc.getBuildLevel();
+
+ JavaPairRDD<byte[], Object[]> parentRDD = baseCuboidRDD;
+ for (int level = 1; level <= totalLevels; level++) {
+ JavaPairRDD<byte[], Object[]> childRDD = parentRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<byte[], Object[]>, byte[], Object[]>() {
+
+ transient boolean initialized = false;
+
+ RowKeySplitter rowKeySplitter = new RowKeySplitter(vCubeSegment.getValue(), 65, 256);
+
+ @Override
+ public Iterable<Tuple2<byte[], Object[]>> call(Tuple2<byte[], Object[]> tuple2) throws Exception {
+ if (initialized == false) {
+ prepare();
+ initialized = true;
+ }
+
+ List<Tuple2<byte[], Object[]>> tuples = Lists.newArrayList();
+ byte[] key = tuple2._1();
+ long cuboidId = rowKeySplitter.split(key);
+ Cuboid parentCuboid = Cuboid.findById(vCubeDesc.getValue(), cuboidId);
+
+ Collection<Long> myChildren = vCuboidScheduler.getValue().getSpanningCuboid(cuboidId);
+
+ // if still empty or null
+ if (myChildren == null || myChildren.size() == 0) {
+ return tuples;
+ }
+
+ for (Long child : myChildren) {
+ Cuboid childCuboid = Cuboid.findById(vCubeDesc.getValue(), child);
+ Pair<Integer, ByteArray> result = vNDCuboidBuilder.getValue().buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers());
+
+ byte[] newKey = new byte[result.getFirst()];
+ System.arraycopy(result.getSecond().array(), 0, newKey, 0, result.getFirst());
+
+ tuples.add(new Tuple2<>(newKey, tuple2._2()));
+ }
+
+ return tuples;
+ }
+ }).reduceByKey(reducerFunction2);
+
+ // persistent rdd to hdfs
+ persistent(childRDD, vCodec.getValue(), outputPath, level, sc.hadoopConfiguration());
+ parentRDD = childRDD;
+ }
+
+ logger.info("Finished on calculating all level cuboids.");
+
+ }
+
+ private void persistent(final JavaPairRDD<byte[], Object[]> rdd, final BufferedMeasureCodec codec, final String hdfsBaseLocation, int level, Configuration conf) {
+ final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
+ final JavaPairRDD<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> serializedRDD = rdd.mapToPair(new PairFunction<Tuple2<byte[], Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
+ @Override
+ public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(Tuple2<byte[], Object[]> tuple2) throws Exception {
+ ByteBuffer valueBuf = codec.encode(tuple2._2());
+ byte[] encodedBytes = new byte[valueBuf.position()];
+ System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position());
+ return new Tuple2<>(new org.apache.hadoop.io.Text(tuple2._1()), new org.apache.hadoop.io.Text(encodedBytes));
+ }
+ });
+ logger.debug("Persisting RDD for level " + level + " into " + cuboidOutputPath);
+ serializedRDD.saveAsNewAPIHadoopFile(cuboidOutputPath, org.apache.hadoop.io.Text.class, org.apache.hadoop.io.Text.class, SequenceFileOutputFormat.class, conf);
+ logger.debug("Done: persisting RDD for level " + level);
+ }
+
+ class CuboidReducerFunction2 implements Function2<Object[], Object[], Object[]> {
+ BufferedMeasureCodec codec;
+ CubeDesc cubeDesc;
+ int measureNum;
+ transient ThreadLocal<MeasureAggregators> current = new ThreadLocal<>();
+
+ CuboidReducerFunction2(int measureNum, CubeDesc cubeDesc, BufferedMeasureCodec codec) {
+ this.codec = codec;
+ this.cubeDesc = cubeDesc;
+ this.measureNum = measureNum;
+ }
+
+ @Override
+ public Object[] call(Object[] input1, Object[] input2) throws Exception {
+ if (current.get() == null) {
+ current.set(new MeasureAggregators(cubeDesc.getMeasures()));
+ }
+ Object[] result = new Object[measureNum];
+ current.get().reset();
+ current.get().aggregate(input1);
+ current.get().aggregate(input2);
+ current.get().collectStates(result);
+ return result;
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ current = new ThreadLocal();
+ }
+ }
+
+ class BaseCuboidReducerFunction2 extends CuboidReducerFunction2 {
+ boolean[] needAggr;
+
+ BaseCuboidReducerFunction2(int measureNum, CubeDesc cubeDesc, BufferedMeasureCodec codec, boolean[] needAggr) {
+ super(measureNum, cubeDesc, codec);
+ this.needAggr = needAggr;
+ }
+
+ @Override
+ public Object[] call(Object[] input1, Object[] input2) throws Exception {
+ if (current.get() == null) {
+ current.set(new MeasureAggregators(cubeDesc.getMeasures()));
+ }
+ current.get().reset();
+ Object[] result = new Object[measureNum];
+ current.get().aggregate(input1, needAggr);
+ current.get().aggregate(input2, needAggr);
+ current.get().collectStates(result);
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 7c88372..644f73f 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.Logger;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
@@ -38,11 +39,16 @@ public class SparkExecutable extends AbstractExecutable {
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(SparkExecutable.class);
private static final String CLASS_NAME = "className";
+ private static final String JARS = "jars";
public void setClassName(String className) {
this.setParam(CLASS_NAME, className);
}
+ public void setJars(String jars) {
+ this.setParam(JARS, jars);
+ }
+
private String formatArgs() {
StringBuilder stringBuilder = new StringBuilder();
for (Map.Entry<String, String> entry : getParams().entrySet()) {
@@ -50,6 +56,9 @@ public class SparkExecutable extends AbstractExecutable {
tmp.append("-").append(entry.getKey()).append(" ").append(entry.getValue()).append(" ");
if (entry.getKey().equals(CLASS_NAME)) {
stringBuilder.insert(0, tmp);
+ } else if (entry.getKey().equals(JARS)) {
+ // JARS is for spark-submit, not for app
+ continue;
} else {
stringBuilder.append(tmp);
}
@@ -65,12 +74,22 @@ public class SparkExecutable extends AbstractExecutable {
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
final KylinConfig config = context.getConfig();
Preconditions.checkNotNull(config.getSparkHome());
- Preconditions.checkNotNull(config.getSparkMaster());
+ Preconditions.checkNotNull(config.getKylinJobJarPath());
+ String sparkConf = config.getSparkConfFile();
+ String jars = this.getParam(JARS);
+
+ String jobJar = config.getKylinJobJarPath();
+
+ if (StringUtils.isEmpty(jars)) {
+ jars = jobJar;
+ }
+
try {
- String cmd = String.format("%s/bin/spark-submit --class \"org.apache.kylin.common.util.SparkEntry\" --master %s %s %s", config.getSparkHome(), config.getSparkMaster(), config.getKylinSparkJobJarPath(), formatArgs());
+ String cmd = String.format("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class \"org.apache.kylin.common.util.SparkEntry\" --properties-file %s --jars %s %s %s", config.getSparkHadoopConfDir(), config.getSparkHome(), sparkConf, jars, jobJar, formatArgs());
logger.info("cmd:" + cmd);
final StringBuilder output = new StringBuilder();
- config.getCliCommandExecutor().execute(cmd, new Logger() {
+ CliCommandExecutor exec = new CliCommandExecutor();
+ exec.execute(cmd, new Logger() {
@Override
public void log(String message) {
output.append(message);
@@ -84,4 +103,5 @@ public class SparkExecutable extends AbstractExecutable {
return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
}
}
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/examples/test_case_data/sandbox/kylin-spark-conf.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin-spark-conf.properties b/examples/test_case_data/sandbox/kylin-spark-conf.properties
new file mode 100644
index 0000000..ca65994
--- /dev/null
+++ b/examples/test_case_data/sandbox/kylin-spark-conf.properties
@@ -0,0 +1,27 @@
+spark.yarn.submit.file.replication=1
+spark.yarn.executor.memoryOverhead=200
+spark.yarn.driver.memoryOverhead=384
+#spark.master=local[4]
+spark.master=yarn
+spark.submit.deployMode=cluster
+spark.eventLog.enabled=true
+spark.yarn.scheduler.heartbeat.interval-ms=5000
+spark.yarn.preserve.staging.files=true
+spark.yarn.queue=default
+spark.yarn.containerLauncherMaxThreads=25
+spark.yarn.max.executor.failures=3
+spark.eventLog.dir=hdfs\:///spark-history
+spark.history.kerberos.enabled=true
+spark.history.provider=org.apache.spark.deploy.history.FsHistoryProvider
+spark.history.ui.port=18080
+spark.history.fs.logDirectory=hdfs\:///spark-history
+spark.executor.memory=1G
+spark.storage.memoryFraction=0.3
+spark.executor.cores=1
+spark.executor.instances=1
+spark.history.kerberos.keytab=none
+spark.history.kerberos.principal=none
+spark.yarn.jar=hdfs://sandbox.hortonworks.com:8020/apps/spark/spark-assembly-1.6.3-hadoop2.6.0.jar
+spark.driver.extraJavaOptions=-Dhdp.version=current
+spark.yarn.am.extraJavaOptions=-Dhdp.version=current
+spark.executor.extraJavaOptions=-Dhdp.version=current
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index 20bc427..db8eb7a 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -63,7 +63,7 @@ kylin.job.retry=0
# you will have to specify kylin.job.remote-cli-hostname, kylin.job.remote-cli-username and kylin.job.remote-cli-password
# It should not be set to "true" unless you're NOT running Kylin.sh on a hadoop client machine
# (Thus kylin instance has to ssh to another real hadoop client machine to execute hbase,hive,hadoop commands)
-kylin.job.use-remote-cli=false
+kylin.job.use-remote-cli=true
# Only necessary when kylin.job.use-remote-cli=true
kylin.job.remote-cli-hostname=sandbox
@@ -154,3 +154,12 @@ kylin.server.query-metrics-percentiles-intervals=60, 360, 3600
# Env DEV|QA|PROD
kylin.env=DEV
+kylin.source.hive.keep-flat-table=true
+
+### Spark ###
+#kylin.engine.spark.spark-home=/usr/hdp/2.2.4.2-2/spark
+#kylin.engine.spark.env.hadoop-conf-dir=/etc/hadoop/conf
+kylin.engine.spark.env.hadoop-conf-dir=/Users/shishaofeng/workspace/kylin-15/examples/test_case_data/sandbox
+kylin.engine.spark.spark-home=/Users/shishaofeng/spark-1.6.3-bin-hadoop2.6
+kylin.engine.spark.properties-file=/Users/shishaofeng/workspace/kylin-15/examples/test_case_data/sandbox/kylin-spark-conf.properties
+kylin.engine.spark.conf.jars=
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2ff4c54..73a4b4d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,7 +84,7 @@
<commons-math3.version>3.6.1</commons-math3.version>
<!-- Spark -->
- <spark.version>1.3.0</spark.version>
+ <spark.version>1.6.3</spark.version>
<!-- Utility -->
<log4j.version>1.2.17</log4j.version>
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 978f477..f5b9f7e 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -327,10 +327,10 @@ public class CubeController extends BasicController {
throw new InternalErrorException("Cannot find cube '" + cubeName + "'");
}
- if (cube.getSegments() != null && cube.getBuildingSegments().size() > 0) {
- int num = cube.getBuildingSegments().size();
- throw new InternalErrorException("Cannot purge cube '" + cubeName + "' as there is " + num + " building " + (num > 1 ? "segment(s)." : "segment. Discard the related job first."));
- }
+// if (cube.getSegments() != null && cube.getBuildingSegments().size() > 0) {
+// int num = cube.getBuildingSegments().size();
+// throw new InternalErrorException("Cannot purge cube '" + cubeName + "' as there is " + num + " building " + (num > 1 ? "segment(s)." : "segment. Discard the related job first."));
+// }
return cubeService.purgeCube(cube);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index cf92fb1..f3b61ef 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -249,6 +249,13 @@
<scope>provided</scope>
</dependency>
+ <!-- Spark dependency -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.10</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-testing-util</artifactId>