You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/05 03:18:33 UTC
[kylin] branch master updated: KYLIN-3427 covnert to HFile in Spark
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 54cd2c7 KYLIN-3427 covnert to HFile in Spark
54cd2c7 is described below
commit 54cd2c793bb5487cefc6def073a902ae73e5c5d4
Author: shaofengshi <sh...@apache.org>
AuthorDate: Sat Jun 30 10:45:47 2018 +0800
KYLIN-3427 covnert to HFile in Spark
---
build/conf/kylin-spark-log4j.properties | 43 +++
.../java/org/apache/kylin/common/KylinConfig.java | 2 +-
.../org/apache/kylin/common/util/BytesUtil.java | 15 +
.../org/apache/kylin/common/util/ClassUtil.java | 10 +
.../src/main/resources/kylin-defaults.properties | 8 +-
.../org/apache/kylin/metadata/model/JoinsTree.java | 2 +-
.../org/apache/kylin/metrics/MetricsManager.java | 2 +-
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 30 +-
.../kylin/engine/mr/BatchMergeJobBuilder2.java | 14 -
.../apache/kylin/engine/mr/JobBuilderSupport.java | 55 ++++
engine-spark/pom.xml | 11 -
.../org/apache/kylin/engine/spark/ISparkInput.java | 60 ++++
.../apache/kylin/engine/spark/ISparkOutput.java | 129 ++++++++
.../kylin/engine/spark/KylinKryoRegistrator.java | 5 +-
.../kylin/engine/spark/KylinSparkJobListener.java | 55 ++++
.../engine/spark/SparkBatchCubingEngine2.java | 39 ++-
.../engine/spark/SparkBatchCubingJobBuilder2.java | 66 ++--
.../engine/spark/SparkBatchMergeJobBuilder2.java | 36 +--
.../kylin/engine/spark/SparkCubingByLayer.java | 5 +-
.../apache/kylin/engine/spark/SparkExecutable.java | 28 +-
.../org/apache/kylin/engine/spark/SparkUtil.java | 63 ++++
source-hive/pom.xml | 8 +
.../kylin/source/hive/GarbageCollectionStep.java | 117 +++++++
.../apache/kylin/source/hive/HiveInputBase.java | 167 ++++++++++
.../org/apache/kylin/source/hive/HiveMRInput.java | 336 +--------------------
.../org/apache/kylin/source/hive/HiveSource.java | 3 +
.../apache/kylin/source/hive/HiveSparkInput.java | 125 ++++++++
.../source/hive/RedistributeFlatHiveTableStep.java | 147 +++++++++
.../apache/kylin/source/jdbc/JdbcHiveMRInput.java | 2 +-
.../apache/kylin/source/hive/HiveMRInputTest.java | 5 +-
.../apache/kylin/source/kafka/KafkaInputBase.java | 163 ++++++++++
.../apache/kylin/source/kafka/KafkaMRInput.java | 178 +----------
.../org/apache/kylin/source/kafka/KafkaSource.java | 3 +
.../apache/kylin/source/kafka/KafkaSparkInput.java | 120 ++++++++
storage-hbase/pom.xml | 12 +
.../apache/kylin/storage/hbase/HBaseStorage.java | 19 +-
.../{HBaseMRSteps.java => HBaseJobSteps.java} | 47 +--
.../hbase/steps/HBaseMROutput2Transition.java | 8 +-
.../kylin/storage/hbase/steps/HBaseMRSteps.java | 213 +------------
.../hbase/steps/HBaseSparkOutputTransition.java | 100 ++++++
.../kylin/storage/hbase/steps/HBaseSparkSteps.java | 72 +++++
.../kylin/storage/hbase/steps/KeyValueCreator.java | 3 +-
.../kylin/storage/hbase/steps/RowKeyWritable.java | 41 ++-
.../kylin/storage/hbase/steps/SparkCubeHFile.java | 305 +++++++++++++++++++
.../storage/hbase/steps/HFilePartitionerTest.java | 65 ++++
45 files changed, 2074 insertions(+), 863 deletions(-)
diff --git a/build/conf/kylin-spark-log4j.properties b/build/conf/kylin-spark-log4j.properties
new file mode 100644
index 0000000..948fb32
--- /dev/null
+++ b/build/conf/kylin-spark-log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+log4j.rootCategory=WARN,stderr,stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n
+
+log4j.appender.stderr=org.apache.log4j.ConsoleAppender
+log4j.appender.stderr.Target=System.err
+log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
+log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n
+
+
+# Settings to quiet third party logs that are too verbose
+log4j.logger.org.spark-project.jetty=WARN
+log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
+log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
+log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
+log4j.logger.org.apache.parquet=ERROR
+log4j.logger.parquet=ERROR
+
+# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
+log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
+log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
+log4j.logger.org.apache.spark.sql=WARN
+
+log4j.logger.org.apache.kylin=DEBUG
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 81c9b89..4b3b7c3 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -400,7 +400,7 @@ public class KylinConfig extends KylinConfigBase {
// ============================================================================
- Map<Class, Object> managersCache = new ConcurrentHashMap<>();
+ transient Map<Class, Object> managersCache = new ConcurrentHashMap<>();
private KylinConfig() {
super();
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
index 0aa47be..4a54c48 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/BytesUtil.java
@@ -457,4 +457,19 @@ public class BytesUtil {
return sb.toString();
}
+ /**
+ *
+ * @param hex String value of a byte array in hex, e.g, "\\x00\\x0A";
+ * @return the byte array that the hex represented.
+ */
+ public static byte[] fromHex(String hex) {
+ byte[] b = new byte[hex.length() / 4];
+ for (int i = 0; i < b.length; i++) {
+ int index = i * 4;
+ int v = Integer.parseInt(hex.substring(index + 2, index + 4), 16);
+ b[i] = (byte) v;
+ }
+ return b;
+ }
+
}
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java
index a54a24d..bfc6ce1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java
@@ -134,4 +134,14 @@ public class ClassUtil {
throw new RuntimeException(var6);
}
}
+
+ public static String findContainingJar(String className, String perferLibraryName) {
+ try {
+ return findContainingJar(Class.forName(className), perferLibraryName);
+ } catch (ClassNotFoundException e) {
+ logger.warn("failed to locate jar for class " + className + ", ignore it");
+ }
+
+ return "";
+ }
}
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 37520ef..b391b5e 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -66,10 +66,10 @@ kylin.engine.default=2
kylin.storage.default=2
kylin.web.hive-limit=20
kylin.web.help.length=4
-kylin.web.help.0=start|Getting Started|http://kylin.apache.org/docs21/tutorial/kylin_sample.html
-kylin.web.help.1=odbc|ODBC Driver|http://kylin.apache.org/docs21/tutorial/odbc.html
-kylin.web.help.2=tableau|Tableau Guide|http://kylin.apache.org/docs21/tutorial/tableau_91.html
-kylin.web.help.3=onboard|Cube Design Tutorial|http://kylin.apache.org/docs21/howto/howto_optimize_cubes.html
+kylin.web.help.0=start|Getting Started|http://kylin.apache.org/docs/tutorial/kylin_sample.html
+kylin.web.help.1=odbc|ODBC Driver|http://kylin.apache.org/docs/tutorial/odbc.html
+kylin.web.help.2=tableau|Tableau Guide|http://kylin.apache.org/docs/tutorial/tableau_91.html
+kylin.web.help.3=onboard|Cube Design Tutorial|http://kylin.apache.org/docs/howto/howto_optimize_cubes.html
kylin.web.link-streaming-guide=http://kylin.apache.org/
kylin.htrace.show-gui-trace-toggle=false
kylin.web.link-hadoop=
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
index d86a74a..a29d76e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
@@ -203,7 +203,7 @@ public class JoinsTree implements Serializable {
boolean matches(JoinDesc join1, JoinDesc join2);
}
- public static class DefaultJoinDescMatcher implements IJoinDescMatcher {
+ public static class DefaultJoinDescMatcher implements IJoinDescMatcher, Serializable {
@Override
public boolean matches(JoinDesc join1, JoinDesc join2) {
if (join1 == null) {
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java b/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java
index e7f042e..bcfb275 100644
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java
@@ -132,7 +132,7 @@ public class MetricsManager {
Preconditions.checkArgument(activeReservoirPointers.size() == sourceReporterBindProps.keySet().size(),
"Duplicate register names exist!!!");
} else {
- logger.info("Kylin metrics monitor is not enabled!!!");
+ logger.info("Kylin metrics monitor is not enabled");
}
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index b1149ed..5498365 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -22,17 +22,14 @@ import java.util.List;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidUtil;
-import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.engine.mr.steps.BaseCuboidJob;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
import org.apache.kylin.engine.mr.steps.NDCuboidJob;
-import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.metadata.model.TblColRef;
@@ -89,7 +86,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
return result;
}
- private boolean isEnableUHCDictStep() {
+ public boolean isEnableUHCDictStep() {
if (!config.getConfig().isBuildUHCDictWithMREnabled()) {
return false;
}
@@ -102,21 +99,6 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
return true;
}
- private LookupMaterializeContext addMaterializeLookupTableSteps(final CubingJob result) {
- LookupMaterializeContext lookupMaterializeContext = new LookupMaterializeContext(result);
- CubeDesc cubeDesc = seg.getCubeDesc();
- List<String> allSnapshotTypes = cubeDesc.getAllExtLookupSnapshotTypes();
- if (allSnapshotTypes.isEmpty()) {
- return null;
- }
- for (String snapshotType : allSnapshotTypes) {
- logger.info("add lookup table materialize steps for storage type:{}", snapshotType);
- ILookupMaterializer materializer = MRUtil.getExtLookupMaterializer(snapshotType);
- materializer.materializeLookupTablesForCube(lookupMaterializeContext, seg.getCubeInstance());
- }
- return lookupMaterializeContext;
- }
-
protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
// Don't know statistics so that tree cuboid scheduler is not determined. Determine the maxLevel at runtime
final int maxLevel = CuboidUtil.getLongestDepth(seg.getCuboidScheduler().getAllCuboidIds());
@@ -128,16 +110,6 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
}
}
- private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
- SaveStatisticsStep result = new SaveStatisticsStep();
- result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
- CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
- CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
- CubingExecutableUtil.setStatisticsPath(getStatisticsPath(jobId), result.getParams());
- CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
- return result;
- }
-
protected void addInMemCubingSteps(final CubingJob result, String jobId, String cuboidRootPath) {
// base cuboid job
MapReduceExecutable cubeStep = new MapReduceExecutable();
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index badf628..d443f52 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -21,9 +21,6 @@ package org.apache.kylin.engine.mr;
import java.util.List;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.engine.mr.steps.MergeStatisticsStep;
-import org.apache.kylin.job.constant.ExecutableConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,15 +69,4 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
return result;
}
- private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
- MergeStatisticsStep result = new MergeStatisticsStep();
- result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
-
- CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
- CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
- CubingExecutableUtil.setMergingSegmentIds(mergingSegmentIds, result.getParams());
- CubingExecutableUtil.setMergedStatisticsPath(mergedStatisticsFolder, result.getParams());
-
- return result;
- }
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 6458a6a..a1b2cfe 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
@@ -31,6 +32,8 @@ import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob;
import org.apache.kylin.engine.mr.steps.MergeDictionaryStep;
+import org.apache.kylin.engine.mr.steps.MergeStatisticsStep;
+import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
import org.apache.kylin.engine.mr.steps.UHCDictionaryJob;
import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep;
import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep;
@@ -38,6 +41,7 @@ import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
import com.google.common.base.Preconditions;
+import org.apache.kylin.metadata.model.TblColRef;
/**
* Hold reusable steps for builders.
@@ -79,6 +83,18 @@ public class JobBuilderSupport {
return result;
}
+ public MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
+ MergeStatisticsStep result = new MergeStatisticsStep();
+ result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
+
+ CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+ CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+ CubingExecutableUtil.setMergingSegmentIds(mergingSegmentIds, result.getParams());
+ CubingExecutableUtil.setMergedStatisticsPath(mergedStatisticsFolder, result.getParams());
+
+ return result;
+ }
+
public MapReduceExecutable createBuildUHCDictStep(String jobId) {
MapReduceExecutable result = new MapReduceExecutable();
result.setName(ExecutableConstants.STEP_NAME_BUILD_UHC_DICTIONARY);
@@ -173,6 +189,45 @@ public class JobBuilderSupport {
return result;
}
+
+ public boolean isEnableUHCDictStep() {
+ if (!config.getConfig().isBuildUHCDictWithMREnabled()) {
+ return false;
+ }
+
+ List<TblColRef> uhcColumns = seg.getCubeDesc().getAllUHCColumns();
+ if (uhcColumns.size() == 0) {
+ return false;
+ }
+
+ return true;
+ }
+
+ public LookupMaterializeContext addMaterializeLookupTableSteps(final CubingJob result) {
+ LookupMaterializeContext lookupMaterializeContext = new LookupMaterializeContext(result);
+ CubeDesc cubeDesc = seg.getCubeDesc();
+ List<String> allSnapshotTypes = cubeDesc.getAllExtLookupSnapshotTypes();
+ if (allSnapshotTypes.isEmpty()) {
+ return null;
+ }
+ for (String snapshotType : allSnapshotTypes) {
+ ILookupMaterializer materializer = MRUtil.getExtLookupMaterializer(snapshotType);
+ materializer.materializeLookupTablesForCube(lookupMaterializeContext, seg.getCubeInstance());
+ }
+ return lookupMaterializeContext;
+ }
+
+
+ public SaveStatisticsStep createSaveStatisticsStep(String jobId) {
+ SaveStatisticsStep result = new SaveStatisticsStep();
+ result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
+ CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+ CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+ CubingExecutableUtil.setStatisticsPath(getStatisticsPath(jobId), result.getParams());
+ CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
+ return result;
+ }
+
// ============================================================================
public String getJobWorkingDir(String jobId) {
diff --git a/engine-spark/pom.xml b/engine-spark/pom.xml
index 819aaaa..700aeb5 100644
--- a/engine-spark/pom.xml
+++ b/engine-spark/pom.xml
@@ -32,10 +32,6 @@
</parent>
<dependencies>
- <dependency>
- <groupId>org.apache.kylin</groupId>
- <artifactId>kylin-storage-hbase</artifactId>
- </dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
@@ -81,13 +77,6 @@
<dependency>
<groupId>org.apache.kylin</groupId>
- <artifactId>kylin-storage-hbase</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.kylin</groupId>
<artifactId>kylin-core-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java
new file mode 100644
index 0000000..5459c70
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
+
+/**
+ * Any ISource that wishes to serve as input of MapReduce build engine must adapt to this interface.
+ */
+public interface ISparkInput {
+
+ /** Return a helper to participate in batch cubing job flow. */
+ public ISparkBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc);
+
+ /** Return a helper to participate in batch cubing merge job flow. */
+ public ISparkBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
+
+ /**
+ * Participate the batch cubing flow as the input side. Responsible for creating
+ * intermediate flat table (Phase 1) and clean up any leftover (Phase 4).
+ *
+ * - Phase 1: Create Flat Table
+ * - Phase 2: Build Dictionary (with FlatTableInputFormat)
+ * - Phase 3: Build Cube (with FlatTableInputFormat)
+ * - Phase 4: Update Metadata & Cleanup
+ */
+ public interface ISparkBatchCubingInputSide {
+
+ /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
+ public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
+
+ /** Add step that does necessary clean up, like delete the intermediate flat table */
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+ }
+
+ public interface ISparkBatchMergeInputSide {
+
+ /** Add step that executes before merge dictionary and before merge cube. */
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+
+ }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkOutput.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkOutput.java
new file mode 100644
index 0000000..0ebc611
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkOutput.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.spark;
+
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public interface ISparkOutput {
+
+ /** Return a helper to participate in batch cubing job flow. */
+ public ISparkBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg);
+
+ /**
+ * Participate the batch cubing flow as the output side. Responsible for saving
+ * the cuboid output to storage at the end of Phase 3.
+ *
+ * - Phase 1: Create Flat Table
+ * - Phase 2: Build Dictionary
+ * - Phase 3: Build Cube
+ * - Phase 4: Update Metadata & Cleanup
+ */
+ public interface ISparkBatchCubingOutputSide {
+
+ /** Add step that executes after build dictionary and before build cube. */
+ public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow);
+
+ /**
+ * Add step that saves cuboids from HDFS to storage.
+ *
+ * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn,
+ * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+ * dictionary encoding; Mx is measure value serialization form.
+ */
+ void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
+
+ /** Add step that does any necessary clean up. */
+ void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+
+ }
+
+ /** Return a helper to participate in batch merge job flow. */
+ ISparkBatchMergeOutputSide getBatchMergeOutputSide(CubeSegment seg);
+
+ /**
+ * Participate the batch cubing flow as the output side. Responsible for saving
+ * the cuboid output to storage at the end of Phase 2.
+ *
+ * - Phase 1: Merge Dictionary
+ * - Phase 2: Merge Cube
+ * - Phase 3: Update Metadata & Cleanup
+ */
+ interface ISparkBatchMergeOutputSide {
+
+ /** Add step that executes after merge dictionary and before merge cube. */
+ void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+
+ /**
+ * Add step that saves cuboid output from HDFS to storage.
+ *
+ * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn,
+ * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+ * dictionary encoding; Mx is measure value serialization form.
+ */
+ void addStepPhase2_BuildCube(CubeSegment set, List<CubeSegment> mergingSegments, DefaultChainedExecutable jobFlow);
+
+ /** Add step that does any necessary clean up. */
+ void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
+
+ }
+
+ interface ISparkMergeOutputFormat {
+
+ /** Configure the InputFormat of given job. */
+ void configureJobInput(Job job, String input) throws Exception;
+
+ /** Configure the OutputFormat of given job. */
+ void configureJobOutput(Job job, String output, CubeSegment segment) throws Exception;
+
+ CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube);
+ }
+
+ ISparkBatchOptimizeOutputSide getBatchOptimizeOutputSide(CubeSegment seg);
+
+ /**
+ * Participate the batch cubing flow as the output side. Responsible for saving
+ * the cuboid output to storage at the end of Phase 3.
+ *
+ * - Phase 1: Filter Recommended Cuboid Data
+ * - Phase 2: Copy Dictionary & Calculate Statistics & Update Reused Cuboid Shard
+ * - Phase 3: Build Cube
+ * - Phase 4: Cleanup Optimize
+ * - Phase 5: Update Metadata & Cleanup
+ */
+ interface ISparkBatchOptimizeOutputSide {
+
+ /** Create HTable based on recommended cuboids & statistics*/
+ void addStepPhase2_CreateHTable(DefaultChainedExecutable jobFlow);
+
+ /** Build only missing cuboids*/
+ void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
+
+ /** Cleanup intermediate cuboid data on HDFS*/
+ void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+
+ /** Invoked by Checkpoint job & Cleanup old segments' HTables and related working directory*/
+ void addStepPhase5_Cleanup(DefaultChainedExecutable jobFlow);
+ }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
index f9aee63..90c52c2 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
@@ -20,7 +20,7 @@ package org.apache.kylin.engine.spark;
import java.util.LinkedHashSet;
import java.util.Set;
-
+import org.apache.hadoop.io.Text;
import org.apache.kylin.engine.spark.util.PercentileCounterSerializer;
import org.apache.kylin.measure.percentile.PercentileCounter;
import org.apache.spark.serializer.KryoRegistrator;
@@ -45,6 +45,7 @@ public class KylinKryoRegistrator implements KryoRegistrator {
kyroClasses.add(String[].class);
kyroClasses.add(String[][].class);
kyroClasses.add(Object[].class);
+ kyroClasses.add(Text.class);
kyroClasses.add(java.math.BigDecimal.class);
kyroClasses.add(java.util.ArrayList.class);
kyroClasses.add(java.util.LinkedList.class);
@@ -83,7 +84,6 @@ public class KylinKryoRegistrator implements KryoRegistrator {
kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnFamilyDesc[].class);
kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnDesc[].class);
kyroClasses.add(org.apache.kylin.cube.model.RowKeyColDesc[].class);
-
kylinClassByReflection1(kyroClasses);
kylinClassByReflection2(kyroClasses);
@@ -94,6 +94,7 @@ public class KylinKryoRegistrator implements KryoRegistrator {
kyroClasses.add(org.roaringbitmap.buffer.MappeableArrayContainer.class);
kyroClasses.add(org.roaringbitmap.buffer.MappeableBitmapContainer.class);
+ kyroClasses.add(Class.class);
addClassQuitely(kyroClasses, "com.google.common.collect.EmptyImmutableList");
addClassQuitely(kyroClasses, "java.nio.HeapShortBuffer");
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinSparkJobListener.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinSparkJobListener.java
new file mode 100644
index 0000000..7976c3b
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinSparkJobListener.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import java.io.Serializable;
+
+import org.apache.spark.scheduler.SparkListener;
+import org.apache.spark.scheduler.SparkListenerTaskEnd;
+
+public class KylinSparkJobListener extends SparkListener implements Serializable {
+
+ public JobMetrics metrics = new JobMetrics();
+
+ @Override
+ public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
+ if (taskEnd.taskMetrics().outputMetrics() != null) {
+ metrics.bytesWritten += taskEnd.taskMetrics().outputMetrics().bytesWritten();
+ metrics.recordsWritten += taskEnd.taskMetrics().outputMetrics().recordsWritten();
+ }
+ }
+
+ public JobMetrics getMetrics() {
+ return metrics;
+ }
+
+ public static class JobMetrics implements Serializable {
+ long bytesWritten;
+ long recordsWritten;
+
+ public long getBytesWritten() {
+ return bytesWritten;
+ }
+
+ public long getRecordsWritten() {
+ return recordsWritten;
+ }
+ }
+
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
index a7a4151..47316b4 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
@@ -19,15 +19,50 @@
package org.apache.kylin.engine.spark;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.MRBatchCubingEngine2;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.IBatchCubingEngine;
+import org.apache.kylin.engine.mr.BatchOptimizeJobBuilder2;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
/**
*/
-public class SparkBatchCubingEngine2 extends MRBatchCubingEngine2 {
+public class SparkBatchCubingEngine2 implements IBatchCubingEngine {
+ @Override
+ public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc cubeDesc) {
+ return new CubeJoinedFlatTableDesc(cubeDesc);
+ }
+
+ @Override
+ public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment) {
+ return new CubeJoinedFlatTableDesc(newSegment);
+ }
+
@Override
public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
return new SparkBatchCubingJobBuilder2(newSegment, submitter).build();
}
+ @Override
+ public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
+ return new SparkBatchMergeJobBuilder2(mergeSegment, submitter).build();
+ }
+
+ @Override
+ public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter) {
+ //TODO use Spark to optimize
+ return new BatchOptimizeJobBuilder2(optimizeSegment, submitter).build();
+ }
+
+ @Override
+ public Class<?> getSourceInterface() {
+ return ISparkInput.class;
+ }
+
+ @Override
+ public Class<?> getStorageInterface() {
+ return ISparkOutput.class;
+ }
+
}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 6ce9b90..e545166 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -20,12 +20,12 @@ package org.apache.kylin.engine.spark;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StorageURL;
-import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.LookupMaterializeContext;
import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
@@ -37,15 +37,57 @@ import java.util.Map;
/**
*/
-public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
+public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
private static final Logger logger = LoggerFactory.getLogger(SparkBatchCubingJobBuilder2.class);
+ private final ISparkInput.ISparkBatchCubingInputSide inputSide;
+ private final ISparkOutput.ISparkBatchCubingOutputSide outputSide;
+
public SparkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
super(newSegment, submitter);
+ this.inputSide = SparkUtil.getBatchCubingInputSide(seg);
+ this.outputSide = SparkUtil.getBatchCubingOutputSide(seg);
}
- @Override
+ public CubingJob build() {
+ logger.info("Spark new job to BUILD segment " + seg);
+
+ final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+ final String jobId = result.getId();
+ final String cuboidRootPath = getCuboidRootPath(jobId);
+
+ // Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
+ inputSide.addStepPhase1_CreateFlatTable(result);
+
+ // Phase 2: Build Dictionary
+ result.addTask(createFactDistinctColumnsStep(jobId));
+
+ if (isEnableUHCDictStep()) {
+ result.addTask(createBuildUHCDictStep(jobId));
+ }
+
+ result.addTask(createBuildDictionaryStep(jobId));
+ result.addTask(createSaveStatisticsStep(jobId));
+
+ // add materialize lookup tables if needed
+ LookupMaterializeContext lookupMaterializeContext = addMaterializeLookupTableSteps(result);
+
+ outputSide.addStepPhase2_BuildDictionary(result);
+
+ // Phase 3: Build Cube
+ addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute
+ outputSide.addStepPhase3_BuildCube(result);
+
+ // Phase 4: Update Metadata & Cleanup
+ result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext));
+ inputSide.addStepPhase4_Cleanup(result);
+ outputSide.addStepPhase4_Cleanup(result);
+
+ return result;
+ }
+
+
protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
final SparkExecutable sparkExecutable = new SparkExecutable();
sparkExecutable.setClassName(SparkCubingByLayer.class.getName());
@@ -53,20 +95,6 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
result.addTask(sparkExecutable);
}
- @Override
- protected void addInMemCubingSteps(final CubingJob result, String jobId, String cuboidRootPath) {
-
- }
-
- private static String findJar(String className, String perferLibraryName) {
- try {
- return ClassUtil.findContainingJar(Class.forName(className), perferLibraryName);
- } catch (ClassNotFoundException e) {
- logger.warn("failed to locate jar for class " + className + ", ignore it");
- }
-
- return "";
- }
public void configureSparkJob(final CubeSegment seg, final SparkExecutable sparkExecutable,
final String jobId, final String cuboidRootPath) {
@@ -90,7 +118,7 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_SPARK_CUBE);
}
- private String getSegmentMetadataUrl(KylinConfig kylinConfig, String jobId) {
+ public String getSegmentMetadataUrl(KylinConfig kylinConfig, String jobId) {
Map<String, String> param = new HashMap<>();
param.put("path", getDumpMetadataPath(jobId));
return new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(), "hdfs", param).toString();
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
similarity index 64%
copy from engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
copy to engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
index badf628..b68f6e86 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
@@ -16,34 +16,33 @@
* limitations under the License.
*/
-package org.apache.kylin.engine.mr;
+package org.apache.kylin.engine.spark;
import java.util.List;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.engine.mr.steps.MergeStatisticsStep;
-import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-public class BatchMergeJobBuilder2 extends JobBuilderSupport {
- private static final Logger logger = LoggerFactory.getLogger(BatchMergeJobBuilder2.class);
+public class SparkBatchMergeJobBuilder2 extends JobBuilderSupport {
+ private static final Logger logger = LoggerFactory.getLogger(SparkBatchMergeJobBuilder2.class);
- private final IMROutput2.IMRBatchMergeOutputSide2 outputSide;
- private final IMRInput.IMRBatchMergeInputSide inputSide;
+ private final ISparkOutput.ISparkBatchMergeOutputSide outputSide;
+ private final ISparkInput.ISparkBatchMergeInputSide inputSide;
- public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
+ public SparkBatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
super(mergeSegment, submitter);
- this.outputSide = MRUtil.getBatchMergeOutputSide2(seg);
- this.inputSide = MRUtil.getBatchMergeInputSide(seg);
+ this.outputSide = SparkUtil.getBatchMergeOutputSide2(seg);
+ this.inputSide = SparkUtil.getBatchMergeInputSide(seg);
}
public CubingJob build() {
- logger.info("MR_V2 new job to MERGE segment " + seg);
+ logger.info("Spark_V2 new job to MERGE segment " + seg);
final CubeSegment cubeSegment = seg;
final CubingJob result = CubingJob.createMergeJob(cubeSegment, submitter, config);
@@ -71,16 +70,5 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
return result;
}
-
- private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
- MergeStatisticsStep result = new MergeStatisticsStep();
- result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
-
- CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
- CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
- CubingExecutableUtil.setMergingSegmentIds(mergingSegmentIds, result.getParams());
- CubingExecutableUtil.setMergedStatisticsPath(mergedStatisticsFolder, result.getParams());
-
- return result;
- }
+
}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 8f547fd..f842fd5 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -129,8 +129,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
- Class[] kryoClassArray = new Class[] { org.apache.hadoop.io.Text.class,
- Class.forName("scala.reflect.ClassTag$$anon$1"), java.lang.Class.class };
+ Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1") };
SparkConf conf = new SparkConf().setAppName("Cubing for:" + cubeName + " segment " + segmentId);
//serialization conf
@@ -242,7 +241,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
}
allRDDs[totalLevels].unpersist();
logger.info("Finished on calculating all level cuboids.");
- deleteHDFSMeta(metaUrl);
+ // deleteHDFSMeta(metaUrl);
}
protected void setHadoopConf(Job job, CubeSegment segment, String metaUrl) throws Exception {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 8de78c0..dfaa2e1 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -37,6 +37,7 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.common.JobRelatedMetaUtil;
import org.apache.kylin.job.common.PatternedLogger;
+import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
@@ -53,6 +54,7 @@ public class SparkExecutable extends AbstractExecutable {
private static final String CLASS_NAME = "className";
private static final String JARS = "jars";
private static final String JOB_ID = "jobId";
+ private String counter_save_as;
public void setClassName(String className) {
this.setParam(CLASS_NAME, className);
@@ -66,6 +68,10 @@ public class SparkExecutable extends AbstractExecutable {
this.setParam(JARS, jars);
}
+ public void setCounterSaveAs(String value) {
+ counter_save_as = value;
+ }
+
private String formatArgs() {
StringBuilder stringBuilder = new StringBuilder();
for (Map.Entry<String, String> entry : getParams().entrySet()) {
@@ -125,7 +131,7 @@ public class SparkExecutable extends AbstractExecutable {
try {
attachSegmentMetadataWithDict(segment);
} catch (IOException e) {
- throw new ExecuteException("meta dump fialed");
+ throw new ExecuteException("meta dump failed");
}
StringBuilder stringBuilder = new StringBuilder();
@@ -145,7 +151,10 @@ public class SparkExecutable extends AbstractExecutable {
CliCommandExecutor exec = new CliCommandExecutor();
PatternedLogger patternedLogger = new PatternedLogger(logger);
exec.execute(cmd, patternedLogger);
- getManager().addJobInfo(getId(), patternedLogger.getInfo());
+
+ Map<String, String> joblogInfo = patternedLogger.getInfo();
+ readCounters(joblogInfo);
+ getManager().addJobInfo(getId(), joblogInfo);
return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog());
} catch (Exception e) {
logger.error("error run spark job:", e);
@@ -192,4 +201,19 @@ public class SparkExecutable extends AbstractExecutable {
//upload metadata
ResourceTool.copy(KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()), dstConfig);
}
+
+ private void readCounters(final Map<String, String> info) {
+ if (counter_save_as != null) {
+ String[] saveAsNames = counter_save_as.split(",");
+ saveCounterAs(info.get(ExecutableConstants.SOURCE_RECORDS_COUNT), saveAsNames, 0, info);
+ saveCounterAs(info.get(ExecutableConstants.SOURCE_RECORDS_SIZE), saveAsNames, 1, info);
+ saveCounterAs(info.get(ExecutableConstants.HDFS_BYTES_WRITTEN), saveAsNames, 2, info);
+ }
+ }
+
+ private void saveCounterAs(String counter, String[] saveAsNames, int i, Map<String, String> info) {
+ if (saveAsNames.length > i && StringUtils.isBlank(saveAsNames[i]) == false) {
+ info.put(saveAsNames[i].trim(), counter);
+ }
+ }
}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
new file mode 100644
index 0000000..a4e17c3
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.SourceManager;
+import org.apache.kylin.storage.StorageFactory;
+
+public class SparkUtil {
+
+ public static ISparkInput.ISparkBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
+ IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg);
+ return SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchCubingInputSide(flatDesc);
+ }
+
+ private static TableDesc getTableDesc(String tableName, String prj) {
+ return TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(tableName, prj);
+ }
+
+ public static ISparkOutput.ISparkBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg) {
+ return StorageFactory.createEngineAdapter(seg, ISparkOutput.class).getBatchCubingOutputSide(seg);
+ }
+
+ public static ISparkOutput.ISparkBatchMergeOutputSide getBatchMergeOutputSide2(CubeSegment seg) {
+ return StorageFactory.createEngineAdapter(seg, ISparkOutput.class).getBatchMergeOutputSide(seg);
+ }
+
+ public static ISparkInput.ISparkBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
+ return SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchMergeInputSide(seg);
+ }
+
+ public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) {
+ return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchOptimizeOutputSide(seg);
+ }
+
+ private static synchronized GenericOptionsParser getParser(Configuration conf, String[] args) throws Exception {
+ return new GenericOptionsParser(conf, args);
+ }
+}
diff --git a/source-hive/pom.xml b/source-hive/pom.xml
index e7afb49..4c7e937 100644
--- a/source-hive/pom.xml
+++ b/source-hive/pom.xml
@@ -101,6 +101,14 @@
<version>1.10.19</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-engine-spark</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-engine-spark</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/GarbageCollectionStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/GarbageCollectionStep.java
new file mode 100644
index 0000000..ac25d07
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/GarbageCollectionStep.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.hive;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.HiveCmdBuilder;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class GarbageCollectionStep extends AbstractExecutable {
+ private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ KylinConfig config = context.getConfig();
+ StringBuffer output = new StringBuffer();
+ try {
+ output.append(cleanUpIntermediateFlatTable(config));
+ // don't drop view to avoid concurrent issue
+ //output.append(cleanUpHiveViewIntermediateTable(config));
+ } catch (IOException e) {
+ logger.error("job:" + getId() + " execute finished with exception", e);
+ return ExecuteResult.createError(e);
+ }
+
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+ }
+
+ private String cleanUpIntermediateFlatTable(KylinConfig config) throws IOException {
+ StringBuffer output = new StringBuffer();
+ final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+ final List<String> hiveTables = this.getIntermediateTables();
+ for (String hiveTable : hiveTables) {
+ if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) {
+ hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
+ hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS " + hiveTable + ";");
+
+ output.append("Hive table " + hiveTable + " is dropped. \n");
+ }
+ }
+ config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
+ rmdirOnHDFS(getExternalDataPaths());
+ output.append("Path " + getExternalDataPaths() + " is deleted. \n");
+
+ return output.toString();
+ }
+
+ private void rmdirOnHDFS(List<String> paths) throws IOException {
+ for (String path : paths) {
+ Path externalDataPath = new Path(path);
+ FileSystem fs = HadoopUtil.getWorkingFileSystem();
+ if (fs.exists(externalDataPath)) {
+ fs.delete(externalDataPath, true);
+ }
+ }
+ }
+
+ public void setIntermediateTables(List<String> tableIdentity) {
+ setParam("oldHiveTables", StringUtil.join(tableIdentity, ","));
+ }
+
+ private List<String> getIntermediateTables() {
+ List<String> intermediateTables = Lists.newArrayList();
+ String[] tables = StringUtil.splitAndTrim(getParam("oldHiveTables"), ",");
+ for (String t : tables) {
+ intermediateTables.add(t);
+ }
+ return intermediateTables;
+ }
+
+ public void setExternalDataPaths(List<String> externalDataPaths) {
+ setParam("externalDataPaths", StringUtil.join(externalDataPaths, ","));
+ }
+
+ private List<String> getExternalDataPaths() {
+ String[] paths = StringUtil.splitAndTrim(getParam("externalDataPaths"), ",");
+ List<String> result = Lists.newArrayList();
+ for (String s : paths) {
+ result.add(s);
+ }
+ return result;
+ }
+
+ public void setHiveViewIntermediateTableIdentities(String tableIdentities) {
+ setParam("oldHiveViewIntermediateTables", tableIdentities);
+ }
+}
\ No newline at end of file
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
new file mode 100644
index 0000000..eae2e1c
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.HiveCmdBuilder;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.common.ShellExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.JoinTableDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+public class HiveInputBase {
+
+ @SuppressWarnings("unused")
+ private static final Logger logger = LoggerFactory.getLogger(HiveInputBase.class);
+
+ protected static String getTableNameForHCat(TableDesc table) {
+ String tableName = (table.isView()) ? table.getMaterializedName() : table.getName();
+ String database = (table.isView()) ? KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable()
+ : table.getDatabase();
+ return String.format("%s.%s", database, tableName).toUpperCase();
+ }
+
+ protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow, String hdfsWorkingDir,
+ IJoinedFlatTableDesc flatTableDesc, String flatTableDatabase) {
+ final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
+ final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
+ final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
+
+ jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatTableDesc));
+ }
+
+ protected static AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir,
+ String cubeName, IJoinedFlatTableDesc flatDesc) {
+ //from hive to hive
+ final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
+ final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir);
+ String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc);
+
+ CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
+ step.setInitStatement(hiveInitStatements);
+ step.setCreateTableStatement(dropTableHql + createTableHql + insertDataHqls);
+ CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+ step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+ return step;
+ }
+
+ protected static AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName,
+ IJoinedFlatTableDesc flatDesc) {
+ RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep();
+ step.setInitStatement(hiveInitStatements);
+ step.setIntermediateTable(flatDesc.getTableName());
+ step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatDesc));
+ CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+ step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE);
+ return step;
+ }
+
+ protected static ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements,
+ String jobWorkingDir, IJoinedFlatTableDesc flatDesc, List<String> intermediateTables) {
+ ShellExecutable step = new ShellExecutable();
+ step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
+
+ KylinConfig kylinConfig = flatDesc.getSegment().getConfig();
+ TableMetadataManager metadataManager = TableMetadataManager.getInstance(kylinConfig);
+ final Set<TableDesc> lookupViewsTables = Sets.newHashSet();
+
+ String prj = flatDesc.getDataModel().getProject();
+ for (JoinTableDesc lookupDesc : flatDesc.getDataModel().getJoinTables()) {
+ TableDesc tableDesc = metadataManager.getTableDesc(lookupDesc.getTable(), prj);
+ if (lookupDesc.getKind() == DataModelDesc.TableKind.LOOKUP && tableDesc.isView()) {
+ lookupViewsTables.add(tableDesc);
+ }
+ }
+
+ if (lookupViewsTables.size() == 0) {
+ return null;
+ }
+
+ HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+ hiveCmdBuilder.overwriteHiveProps(kylinConfig.getHiveConfigOverride());
+ hiveCmdBuilder.addStatement(hiveInitStatements);
+ for (TableDesc lookUpTableDesc : lookupViewsTables) {
+ String identity = lookUpTableDesc.getIdentity();
+ String intermediate = lookUpTableDesc.getMaterializedName();
+ if (lookUpTableDesc.isView()) {
+ String materializeViewHql = materializeViewHql(intermediate, identity, jobWorkingDir);
+ hiveCmdBuilder.addStatement(materializeViewHql);
+ intermediateTables.add(intermediate);
+ }
+ }
+
+ step.setCmd(hiveCmdBuilder.build());
+ return step;
+ }
+
+ // each append must be a complete hql.
+ protected static String materializeViewHql(String viewName, String tableName, String jobWorkingDir) {
+ StringBuilder createIntermediateTableHql = new StringBuilder();
+ createIntermediateTableHql.append("DROP TABLE IF EXISTS " + viewName + ";\n");
+ createIntermediateTableHql.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + viewName + " LIKE " + tableName
+ + " LOCATION '" + jobWorkingDir + "/" + viewName + "';\n");
+ createIntermediateTableHql.append("ALTER TABLE " + viewName + " SET TBLPROPERTIES('auto.purge'='true');\n");
+ createIntermediateTableHql.append("INSERT OVERWRITE TABLE " + viewName + " SELECT * FROM " + tableName + ";\n");
+ return createIntermediateTableHql.toString();
+ }
+
+ protected static String getJobWorkingDir(DefaultChainedExecutable jobFlow, String hdfsWorkingDir) {
+
+ String jobWorkingDir = JobBuilderSupport.getJobWorkingDir(hdfsWorkingDir, jobFlow.getId());
+ if (KylinConfig.getInstanceFromEnv().getHiveTableDirCreateFirst()) {
+ // Create work dir to avoid hive create it,
+ // the difference is that the owners are different.
+ checkAndCreateWorkDir(jobWorkingDir);
+ }
+ return jobWorkingDir;
+ }
+
+ protected static void checkAndCreateWorkDir(String jobWorkingDir) {
+ try {
+ Path path = new Path(jobWorkingDir);
+ FileSystem fileSystem = HadoopUtil.getFileSystem(path);
+ if (!fileSystem.exists(path)) {
+ logger.info("Create jobWorkDir : " + jobWorkingDir);
+ fileSystem.mkdirs(path);
+ }
+ } catch (IOException e) {
+ logger.error("Could not create lookUp table dir : " + jobWorkingDir);
+ }
+ }
+
+}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 8653500..8aec8b9 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -21,58 +21,33 @@ package org.apache.kylin.source.hive;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
-import java.util.Set;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.HiveCmdBuilder;
-import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.common.PatternedLogger;
-import org.apache.kylin.job.common.ShellExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.TableMetadataManager;
-import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.ISegment;
-import org.apache.kylin.metadata.model.JoinTableDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-public class HiveMRInput implements IMRInput {
+public class HiveMRInput extends HiveInputBase implements IMRInput {
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(HiveMRInput.class);
- public static String getTableNameForHCat(TableDesc table) {
- String tableName = (table.isView()) ? table.getMaterializedName() : table.getName();
- String database = (table.isView()) ? KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable()
- : table.getDatabase();
- return String.format("%s.%s", database, tableName).toUpperCase();
- }
-
@Override
public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
return new BatchCubingInputSide(flatDesc);
@@ -132,7 +107,7 @@ public class HiveMRInput implements IMRInput {
final protected String flatTableDatabase;
final protected String hdfsWorkingDir;
- String hiveViewIntermediateTables = "";
+ List<String> hiveViewIntermediateTables = Lists.newArrayList();
public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
KylinConfig config = KylinConfig.getInstanceFromEnv();
@@ -154,7 +129,7 @@ public class HiveMRInput implements IMRInput {
// then count and redistribute
if (cubeConfig.isHiveRedistributeEnabled()) {
if (flatDesc.getClusterBy() != null || flatDesc.getDistributedBy() != null) {
- jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName));
+ jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc));
}
}
@@ -165,132 +140,31 @@ public class HiveMRInput implements IMRInput {
protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) {
final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
- final String jobWorkingDir = getJobWorkingDir(jobFlow);
+ final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
- jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName));
+ jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc));
}
protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
- final String jobWorkingDir = getJobWorkingDir(jobFlow);
+ final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
- AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir);
+ AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir, flatDesc, hiveViewIntermediateTables);
if (task != null) {
jobFlow.addTask(task);
}
}
- protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
-
- String jobWorkingDir = JobBuilderSupport.getJobWorkingDir(hdfsWorkingDir, jobFlow.getId());
- if (KylinConfig.getInstanceFromEnv().getHiveTableDirCreateFirst()) {
- // Create work dir to avoid hive create it,
- // the difference is that the owners are different.
- checkAndCreateWorkDir(jobWorkingDir);
- }
- return jobWorkingDir;
- }
-
- private void checkAndCreateWorkDir(String jobWorkingDir) {
- try {
- Path path = new Path(jobWorkingDir);
- FileSystem fileSystem = HadoopUtil.getFileSystem(path);
- if (!fileSystem.exists(path)) {
- logger.info("Create jobWorkDir : " + jobWorkingDir);
- fileSystem.mkdirs(path);
- }
- } catch (IOException e) {
- logger.error("Could not create lookUp table dir : " + jobWorkingDir);
- }
- }
-
- private AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName) {
- RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep();
- step.setInitStatement(hiveInitStatements);
- step.setIntermediateTable(flatDesc.getTableName());
- step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatDesc));
- CubingExecutableUtil.setCubeName(cubeName, step.getParams());
- step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE);
- return step;
- }
-
- private ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements,
- String jobWorkingDir) {
- ShellExecutable step = new ShellExecutable();
- step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
-
- KylinConfig kylinConfig = ((CubeSegment) flatDesc.getSegment()).getConfig();
- TableMetadataManager metadataManager = TableMetadataManager.getInstance(kylinConfig);
- final Set<TableDesc> lookupViewsTables = Sets.newHashSet();
-
- String prj = flatDesc.getDataModel().getProject();
- for (JoinTableDesc lookupDesc : flatDesc.getDataModel().getJoinTables()) {
- TableDesc tableDesc = metadataManager.getTableDesc(lookupDesc.getTable(), prj);
- if (lookupDesc.getKind() == DataModelDesc.TableKind.LOOKUP && tableDesc.isView()) {
- lookupViewsTables.add(tableDesc);
- }
- }
-
- if (lookupViewsTables.size() == 0) {
- return null;
- }
-
- HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
- hiveCmdBuilder.overwriteHiveProps(kylinConfig.getHiveConfigOverride());
- hiveCmdBuilder.addStatement(hiveInitStatements);
- for (TableDesc lookUpTableDesc : lookupViewsTables) {
- String identity = lookUpTableDesc.getIdentity();
- String intermediate = lookUpTableDesc.getMaterializedName();
- if (lookUpTableDesc.isView()) {
- String materializeViewHql = materializeViewHql(intermediate, identity, jobWorkingDir);
- hiveCmdBuilder.addStatement(materializeViewHql);
- hiveViewIntermediateTables = hiveViewIntermediateTables + intermediate + ";";
- }
- }
-
- hiveViewIntermediateTables = hiveViewIntermediateTables.substring(0,
- hiveViewIntermediateTables.length() - 1);
-
- step.setCmd(hiveCmdBuilder.build());
- return step;
- }
-
- // each append must be a complete hql.
- public static String materializeViewHql(String viewName, String tableName, String jobWorkingDir) {
- StringBuilder createIntermediateTableHql = new StringBuilder();
- createIntermediateTableHql.append("DROP TABLE IF EXISTS " + viewName + ";\n");
- createIntermediateTableHql.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + viewName + " LIKE " + tableName
- + " LOCATION '" + jobWorkingDir + "/" + viewName + "';\n");
- createIntermediateTableHql.append("ALTER TABLE " + viewName + " SET TBLPROPERTIES('auto.purge'='true');\n");
- createIntermediateTableHql
- .append("INSERT OVERWRITE TABLE " + viewName + " SELECT * FROM " + tableName + ";\n");
- return createIntermediateTableHql.toString();
- }
-
- private AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir,
- String cubeName) {
- //from hive to hive
- final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
- final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir);
- String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc);
-
- CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
- step.setInitStatement(hiveInitStatements);
- step.setCreateTableStatement(dropTableHql + createTableHql + insertDataHqls);
- CubingExecutableUtil.setCubeName(cubeName, step.getParams());
- step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
- return step;
- }
@Override
public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
- final String jobWorkingDir = getJobWorkingDir(jobFlow);
+ final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
GarbageCollectionStep step = new GarbageCollectionStep();
step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity()));
step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)));
- step.setHiveViewIntermediateTableIdentities(hiveViewIntermediateTables);
+ step.setHiveViewIntermediateTableIdentities(StringUtil.join(hiveViewIntermediateTables, ","));
jobFlow.addTask(step);
}
@@ -304,196 +178,4 @@ public class HiveMRInput implements IMRInput {
}
}
- public static class RedistributeFlatHiveTableStep extends AbstractExecutable {
- private final PatternedLogger stepLogger = new PatternedLogger(logger);
-
- private long computeRowCount(String database, String table) throws Exception {
- IHiveClient hiveClient = HiveClientFactory.getHiveClient();
- return hiveClient.getHiveTableRows(database, table);
- }
-
- private long getDataSize(String database, String table) throws Exception {
- IHiveClient hiveClient = HiveClientFactory.getHiveClient();
- long size = hiveClient.getHiveTableMeta(database, table).fileSize;
- return size;
- }
-
- private void redistributeTable(KylinConfig config, int numReducers) throws IOException {
- final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
- hiveCmdBuilder.overwriteHiveProps(config.getHiveConfigOverride());
- hiveCmdBuilder.addStatement(getInitStatement());
- hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + numReducers + ";\n");
- hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n");
- hiveCmdBuilder.addStatement(getRedistributeDataStatement());
- final String cmd = hiveCmdBuilder.toString();
-
- stepLogger.log("Redistribute table, cmd: ");
- stepLogger.log(cmd);
-
- Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger);
- getManager().addJobInfo(getId(), stepLogger.getInfo());
-
- if (response.getFirst() != 0) {
- throw new RuntimeException("Failed to redistribute flat hive table");
- }
- }
-
- private KylinConfig getCubeSpecificConfig() {
- String cubeName = CubingExecutableUtil.getCubeName(getParams());
- CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- CubeInstance cube = manager.getCube(cubeName);
- return cube.getConfig();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- KylinConfig config = getCubeSpecificConfig();
- String intermediateTable = getIntermediateTable();
- String database, tableName;
- if (intermediateTable.indexOf(".") > 0) {
- database = intermediateTable.substring(0, intermediateTable.indexOf("."));
- tableName = intermediateTable.substring(intermediateTable.indexOf(".") + 1);
- } else {
- database = config.getHiveDatabaseForIntermediateTable();
- tableName = intermediateTable;
- }
-
- try {
- long rowCount = computeRowCount(database, tableName);
- logger.debug("Row count of table '" + intermediateTable + "' is " + rowCount);
- if (rowCount == 0) {
- if (!config.isEmptySegmentAllowed()) {
- stepLogger.log("Detect upstream hive table is empty, "
- + "fail the job because \"kylin.job.allow-empty-segment\" = \"false\"");
- return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
- } else {
- return new ExecuteResult(ExecuteResult.State.SUCCEED,
- "Row count is 0, no need to redistribute");
- }
- }
-
- int mapperInputRows = config.getHadoopJobMapperInputRows();
-
- int numReducers = Math.round(rowCount / ((float) mapperInputRows));
- numReducers = Math.max(1, numReducers);
- numReducers = Math.min(numReducers, config.getHadoopJobMaxReducerNumber());
-
- stepLogger.log("total input rows = " + rowCount);
- stepLogger.log("expected input rows per mapper = " + mapperInputRows);
- stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " + numReducers);
-
- redistributeTable(config, numReducers);
- long dataSize = getDataSize(database, tableName);
- getManager().addJobInfo(getId(), ExecutableConstants.HDFS_BYTES_WRITTEN, "" + dataSize);
- return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog());
-
- } catch (Exception e) {
- logger.error("job:" + getId() + " execute finished with exception", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog(), e);
- }
- }
-
- public void setInitStatement(String sql) {
- setParam("HiveInit", sql);
- }
-
- public String getInitStatement() {
- return getParam("HiveInit");
- }
-
- public void setRedistributeDataStatement(String sql) {
- setParam("HiveRedistributeData", sql);
- }
-
- public String getRedistributeDataStatement() {
- return getParam("HiveRedistributeData");
- }
-
- public String getIntermediateTable() {
- return getParam("intermediateTable");
- }
-
- public void setIntermediateTable(String intermediateTable) {
- setParam("intermediateTable", intermediateTable);
- }
- }
-
- public static class GarbageCollectionStep extends AbstractExecutable {
- private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- KylinConfig config = context.getConfig();
- StringBuffer output = new StringBuffer();
- try {
- output.append(cleanUpIntermediateFlatTable(config));
- // don't drop view to avoid concurrent issue
- //output.append(cleanUpHiveViewIntermediateTable(config));
- } catch (IOException e) {
- logger.error("job:" + getId() + " execute finished with exception", e);
- return ExecuteResult.createError(e);
- }
-
- return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
- }
-
- private String cleanUpIntermediateFlatTable(KylinConfig config) throws IOException {
- StringBuffer output = new StringBuffer();
- final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
- final List<String> hiveTables = this.getIntermediateTables();
- for (String hiveTable : hiveTables) {
- if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) {
- hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
- hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS " + hiveTable + ";");
-
- output.append("Hive table " + hiveTable + " is dropped. \n");
- }
- }
- config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
- rmdirOnHDFS(getExternalDataPaths());
- output.append("Path " + getExternalDataPaths() + " is deleted. \n");
-
- return output.toString();
- }
-
- private void rmdirOnHDFS(List<String> paths) throws IOException {
- for (String path : paths) {
- Path externalDataPath = new Path(path);
- FileSystem fs = HadoopUtil.getWorkingFileSystem();
- if (fs.exists(externalDataPath)) {
- fs.delete(externalDataPath, true);
- }
- }
- }
-
- public void setIntermediateTables(List<String> tableIdentity) {
- setParam("oldHiveTables", StringUtil.join(tableIdentity, ","));
- }
-
- private List<String> getIntermediateTables() {
- List<String> intermediateTables = Lists.newArrayList();
- String[] tables = StringUtil.splitAndTrim(getParam("oldHiveTables"), ",");
- for (String t : tables) {
- intermediateTables.add(t);
- }
- return intermediateTables;
- }
-
- public void setExternalDataPaths(List<String> externalDataPaths) {
- setParam("externalDataPaths", StringUtil.join(externalDataPaths, ","));
- }
-
- private List<String> getExternalDataPaths() {
- String[] paths = StringUtil.splitAndTrim(getParam("externalDataPaths"), ",");
- List<String> result = Lists.newArrayList();
- for (String s : paths) {
- result.add(s);
- }
- return result;
- }
-
- public void setHiveViewIntermediateTableIdentities(String tableIdentities) {
- setParam("oldHiveViewIntermediateTables", tableIdentities);
- }
- }
}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
index 58bd2c3..daf93d3 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.spark.ISparkInput;
import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.source.IReadableTable;
@@ -45,6 +46,8 @@ public class HiveSource implements ISource {
public <I> I adaptToBuildEngine(Class<I> engineInterface) {
if (engineInterface == IMRInput.class) {
return (I) new HiveMRInput();
+ } else if (engineInterface == ISparkInput.class) {
+ return (I) new HiveSparkInput();
} else {
throw new RuntimeException("Cannot adapt to " + engineInterface);
}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
new file mode 100644
index 0000000..779835b
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.engine.spark.ISparkInput;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class HiveSparkInput extends HiveInputBase implements ISparkInput {
+
+ @SuppressWarnings("unused")
+ private static final Logger logger = LoggerFactory.getLogger(HiveSparkInput.class);
+
+ @Override
+ public ISparkInput.ISparkBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+ return new BatchCubingInputSide(flatDesc);
+ }
+
+ @Override
+ public ISparkInput.ISparkBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+ return new ISparkInput.ISparkBatchMergeInputSide() {
+ @Override
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+ // doing nothing
+ }
+ };
+ }
+
+ public class BatchCubingInputSide implements ISparkBatchCubingInputSide {
+
+ final protected IJoinedFlatTableDesc flatDesc;
+ final protected String flatTableDatabase;
+ final protected String hdfsWorkingDir;
+
+ List<String> hiveViewIntermediateTables = Lists.newArrayList();
+
+ public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ this.flatDesc = flatDesc;
+ this.flatTableDatabase = config.getHiveDatabaseForIntermediateTable();
+ this.hdfsWorkingDir = config.getHdfsWorkingDirectory();
+ }
+
+ @Override
+ public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
+ final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
+ final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName)
+ .getConfig();
+ final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
+
+ // create flat table first
+ addStepPhase1_DoCreateFlatTable(jobFlow, hdfsWorkingDir, flatDesc, flatTableDatabase);
+
+ // then count and redistribute
+ if (cubeConfig.isHiveRedistributeEnabled()) {
+ if (flatDesc.getClusterBy() != null || flatDesc.getDistributedBy() != null) {
+ jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc));
+ }
+ }
+
+ // special for hive
+ addStepPhase1_DoMaterializeLookupTable(jobFlow);
+ }
+
+ protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
+ final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
+ final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
+
+ AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir, flatDesc, hiveViewIntermediateTables);
+ if (task != null) {
+ jobFlow.addTask(task);
+ }
+ }
+
+
+
+ @Override
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+ final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
+
+ GarbageCollectionStep step = new GarbageCollectionStep();
+ step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
+ step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity()));
+ step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)));
+ step.setHiveViewIntermediateTableIdentities(StringUtil.join(hiveViewIntermediateTables, ","));
+ jobFlow.addTask(step);
+ }
+
+ private String getIntermediateTableIdentity() {
+ return flatTableDatabase + "." + flatDesc.getTableName();
+ }
+ }
+
+}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableStep.java
new file mode 100644
index 0000000..0dfb5bf
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/RedistributeFlatHiveTableStep.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.hive;
+
+import java.io.IOException;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HiveCmdBuilder;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.common.PatternedLogger;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+
+public class RedistributeFlatHiveTableStep extends AbstractExecutable {
+ private final PatternedLogger stepLogger = new PatternedLogger(logger);
+
+ private long computeRowCount(String database, String table) throws Exception {
+ IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+ return hiveClient.getHiveTableRows(database, table);
+ }
+
+ private long getDataSize(String database, String table) throws Exception {
+ IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+ long size = hiveClient.getHiveTableMeta(database, table).fileSize;
+ return size;
+ }
+
+ private void redistributeTable(KylinConfig config, int numReducers) throws IOException {
+ final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+ hiveCmdBuilder.overwriteHiveProps(config.getHiveConfigOverride());
+ hiveCmdBuilder.addStatement(getInitStatement());
+ hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + numReducers + ";\n");
+ hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n");
+ hiveCmdBuilder.addStatement(getRedistributeDataStatement());
+ final String cmd = hiveCmdBuilder.toString();
+
+ stepLogger.log("Redistribute table, cmd: ");
+ stepLogger.log(cmd);
+
+ Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger);
+ getManager().addJobInfo(getId(), stepLogger.getInfo());
+
+ if (response.getFirst() != 0) {
+ throw new RuntimeException("Failed to redistribute flat hive table");
+ }
+ }
+
+ private KylinConfig getCubeSpecificConfig() {
+ String cubeName = CubingExecutableUtil.getCubeName(getParams());
+ CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubeInstance cube = manager.getCube(cubeName);
+ return cube.getConfig();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ KylinConfig config = getCubeSpecificConfig();
+ String intermediateTable = getIntermediateTable();
+ String database, tableName;
+ if (intermediateTable.indexOf(".") > 0) {
+ database = intermediateTable.substring(0, intermediateTable.indexOf("."));
+ tableName = intermediateTable.substring(intermediateTable.indexOf(".") + 1);
+ } else {
+ database = config.getHiveDatabaseForIntermediateTable();
+ tableName = intermediateTable;
+ }
+
+ try {
+ long rowCount = computeRowCount(database, tableName);
+ logger.debug("Row count of table '" + intermediateTable + "' is " + rowCount);
+ if (rowCount == 0) {
+ if (!config.isEmptySegmentAllowed()) {
+ stepLogger.log("Detect upstream hive table is empty, "
+ + "fail the job because \"kylin.job.allow-empty-segment\" = \"false\"");
+ return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog());
+ } else {
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "Row count is 0, no need to redistribute");
+ }
+ }
+
+ int mapperInputRows = config.getHadoopJobMapperInputRows();
+
+ int numReducers = Math.round(rowCount / ((float) mapperInputRows));
+ numReducers = Math.max(1, numReducers);
+ numReducers = Math.min(numReducers, config.getHadoopJobMaxReducerNumber());
+
+ stepLogger.log("total input rows = " + rowCount);
+ stepLogger.log("expected input rows per mapper = " + mapperInputRows);
+ stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " + numReducers);
+
+ redistributeTable(config, numReducers);
+ long dataSize = getDataSize(database, tableName);
+ getManager().addJobInfo(getId(), ExecutableConstants.HDFS_BYTES_WRITTEN, "" + dataSize);
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog());
+
+ } catch (Exception e) {
+ logger.error("job:" + getId() + " execute finished with exception", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog(), e);
+ }
+ }
+
+ public void setInitStatement(String sql) {
+ setParam("HiveInit", sql);
+ }
+
+ public String getInitStatement() {
+ return getParam("HiveInit");
+ }
+
+ public void setRedistributeDataStatement(String sql) {
+ setParam("HiveRedistributeData", sql);
+ }
+
+ public String getRedistributeDataStatement() {
+ return getParam("HiveRedistributeData");
+ }
+
+ public String getIntermediateTable() {
+ return getParam("intermediateTable");
+ }
+
+ public void setIntermediateTable(String intermediateTable) {
+ setParam("intermediateTable", intermediateTable);
+ }
+}
\ No newline at end of file
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
index fcdb516..74d95cf 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
@@ -66,7 +66,7 @@ public class JdbcHiveMRInput extends HiveMRInput {
protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) {
final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
- final String jobWorkingDir = getJobWorkingDir(jobFlow);
+ final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
jobFlow.addTask(createSqoopToFlatHiveStep(jobWorkingDir, cubeName));
jobFlow.addTask(createFlatHiveTableFromFiles(hiveInitStatements, jobWorkingDir));
diff --git a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java
index a81cc21..34c946a 100644
--- a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java
+++ b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java
@@ -46,8 +46,7 @@ public class HiveMRInputTest {
DefaultChainedExecutable defaultChainedExecutable = mock(DefaultChainedExecutable.class);
defaultChainedExecutable.setId(UUID.randomUUID().toString());
- HiveMRInput.BatchCubingInputSide batchCubingInputSide = new HiveMRInput.BatchCubingInputSide(null);
- String jobWorkingDir = batchCubingInputSide.getJobWorkingDir(defaultChainedExecutable);
+ String jobWorkingDir = HiveInputBase.getJobWorkingDir(defaultChainedExecutable, KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
jobWorkDirPath = new Path(jobWorkingDir);
Assert.assertTrue(fileSystem.exists(jobWorkDirPath));
} finally {
@@ -65,7 +64,7 @@ public class HiveMRInputTest {
StringBuilder hqls = new StringBuilder();
for (int i = 0; i < viewSize; i++) {
- String hql = HiveMRInput.BatchCubingInputSide.materializeViewHql(mockedViewNames[i], mockedTalbeNames[i],
+ String hql = HiveInputBase.materializeViewHql(mockedViewNames[i], mockedTalbeNames[i],
mockedWorkingDir);
hqls.append(hql);
}
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
new file mode 100644
index 0000000..a624f8f
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.kafka;
+
+import java.util.List;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.hive.CreateFlatHiveTableStep;
+import org.apache.kylin.source.hive.GarbageCollectionStep;
+import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob;
+import org.apache.kylin.source.kafka.job.MergeOffsetStep;
+
+public class KafkaInputBase {
+
+ protected static AbstractExecutable createMergeOffsetStep(String jobId, CubeSegment cubeSegment) {
+
+ final MergeOffsetStep result = new MergeOffsetStep();
+ result.setName("Merge offset step");
+
+ CubingExecutableUtil.setCubeName(cubeSegment.getCubeInstance().getName(), result.getParams());
+ CubingExecutableUtil.setSegmentId(cubeSegment.getUuid(), result.getParams());
+ CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
+ return result;
+ }
+
+ protected static MapReduceExecutable createSaveKafkaDataStep(String jobId, String location, CubeSegment seg) {
+ MapReduceExecutable result = new MapReduceExecutable();
+ result.setName("Save data from Kafka");
+ result.setMapReduceJobClass(KafkaFlatTableJob.class);
+ JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "system");
+ StringBuilder cmd = new StringBuilder();
+ jobBuilderSupport.appendMapReduceParameters(cmd);
+ JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+ JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, location);
+ JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
+ JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+ "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step");
+
+ result.setMapReduceParams(cmd.toString());
+ return result;
+ }
+
+ protected static AbstractExecutable createFlatTable(final String hiveTableDatabase, final String mockFactTableName,
+ final String baseLocation, final String cubeName, final CubeDesc cubeDesc,
+ final IJoinedFlatTableDesc flatDesc, final List<String> intermediateTables,
+ final List<String> intermediatePaths) {
+ final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase);
+
+ final IJoinedFlatTableDesc mockfactDesc = new IJoinedFlatTableDesc() {
+
+ @Override
+ public String getTableName() {
+ return mockFactTableName;
+ }
+
+ @Override
+ public DataModelDesc getDataModel() {
+ return cubeDesc.getModel();
+ }
+
+ @Override
+ public List<TblColRef> getAllColumns() {
+ return flatDesc.getFactColumns();
+ }
+
+ @Override
+ public List<TblColRef> getFactColumns() {
+ return null;
+ }
+
+ @Override
+ public int getColumnIndex(TblColRef colRef) {
+ return 0;
+ }
+
+ @Override
+ public SegmentRange getSegRange() {
+ return null;
+ }
+
+ @Override
+ public TblColRef getDistributedBy() {
+ return null;
+ }
+
+ @Override
+ public TblColRef getClusterBy() {
+ return null;
+ }
+
+ @Override
+ public ISegment getSegment() {
+ return null;
+ }
+
+ @Override
+ public boolean useAlias() {
+ return false;
+ }
+ };
+ final String dropFactTableHql = JoinedFlatTable.generateDropTableStatement(mockfactDesc);
+ // the table inputformat is sequence file
+ final String createFactTableHql = JoinedFlatTable.generateCreateTableStatement(mockfactDesc, baseLocation,
+ JoinedFlatTable.SEQUENCEFILE);
+
+ final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
+ final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, baseLocation);
+ String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc);
+ insertDataHqls = insertDataHqls.replace(flatDesc.getDataModel().getRootFactTableName() + " ",
+ mockFactTableName + " ");
+
+ CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
+ CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+ step.setInitStatement(hiveInitStatements);
+ step.setCreateTableStatement(
+ dropFactTableHql + createFactTableHql + dropTableHql + createTableHql + insertDataHqls);
+ step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+
+ intermediateTables.add(flatDesc.getTableName());
+ intermediateTables.add(mockFactTableName);
+ intermediatePaths.add(baseLocation + "/" + flatDesc.getTableName());
+ intermediatePaths.add(baseLocation + "/" + mockFactTableName);
+ return step;
+ }
+
+ protected static AbstractExecutable createGCStep(List<String> intermediateTables, List<String> intermediatePaths) {
+ GarbageCollectionStep step = new GarbageCollectionStep();
+ step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
+ step.setIntermediateTables(intermediateTables);
+ step.setExternalDataPaths(intermediatePaths);
+
+ return step;
+ }
+}
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index bc2426d..a45cc63 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -22,8 +22,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import com.google.common.collect.Lists;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
@@ -31,38 +29,25 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.ISegment;
-import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.hive.CreateFlatHiveTableStep;
-import org.apache.kylin.source.hive.HiveMRInput;
-import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob;
-import org.apache.kylin.source.kafka.job.MergeOffsetStep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class KafkaMRInput implements IMRInput {
+import com.google.common.collect.Lists;
+
+public class KafkaMRInput extends KafkaInputBase implements IMRInput {
private static final Logger logger = LoggerFactory.getLogger(KafkaMRInput.class);
private CubeSegment cubeSegment;
@@ -99,7 +84,8 @@ public class KafkaMRInput implements IMRInput {
job.setInputFormatClass(SequenceFileInputFormat.class);
String jobId = job.getConfiguration().get(BatchConstants.ARG_CUBING_JOB_ID);
IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(cubeSegment);
- String inputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
+ String inputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc,
+ JobBuilderSupport.getJobWorkingDir(conf, jobId));
try {
FileInputFormat.addInputPath(job, new Path(inputPath));
} catch (IOException e) {
@@ -110,7 +96,7 @@ public class KafkaMRInput implements IMRInput {
@Override
public Collection<String[]> parseMapperInput(Object mapperInput) {
Text text = (Text) mapperInput;
- String[] columns = Bytes.toString(text.getBytes(), 0, text.getLength()).split(delimiter);
+ String[] columns = Bytes.toString(text.getBytes(), 0, text.getLength()).split(delimiter);
return Collections.singletonList(columns);
}
@@ -120,7 +106,7 @@ public class KafkaMRInput implements IMRInput {
final JobEngineConfig conf;
final CubeSegment seg;
- private CubeDesc cubeDesc ;
+ private CubeDesc cubeDesc;
private KylinConfig config;
protected IJoinedFlatTableDesc flatDesc;
protected String hiveTableDatabase;
@@ -147,119 +133,23 @@ public class KafkaMRInput implements IMRInput {
// directly use flat table location
final String intermediateFactTable = flatDesc.getTableName();
final String tableLocation = baseLocation + "/" + intermediateFactTable;
- jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), tableLocation));
+ jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), tableLocation, seg));
intermediatePaths.add(tableLocation);
} else {
- final String mockFactTableName = MetadataConstants.KYLIN_INTERMEDIATE_PREFIX + cubeName.toLowerCase() + "_"
- + seg.getUuid().replaceAll("-", "_") + "_fact";
- jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + mockFactTableName));
- jobFlow.addTask(createFlatTable(mockFactTableName, baseLocation));
+ final String mockFactTableName = MetadataConstants.KYLIN_INTERMEDIATE_PREFIX + cubeName.toLowerCase()
+ + "_" + seg.getUuid().replaceAll("-", "_") + "_fact";
+ jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + mockFactTableName, seg));
+ jobFlow.addTask(createFlatTable(hiveTableDatabase, mockFactTableName, baseLocation, cubeName, cubeDesc, flatDesc, intermediateTables, intermediatePaths));
}
}
- private AbstractExecutable createFlatTable(final String mockFactTableName, String baseLocation) {
- final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase);
-
- final IJoinedFlatTableDesc mockfactDesc = new IJoinedFlatTableDesc() {
-
- @Override
- public String getTableName() {
- return mockFactTableName;
- }
-
- @Override
- public DataModelDesc getDataModel() {
- return cubeDesc.getModel();
- }
-
- @Override
- public List<TblColRef> getAllColumns() {
- return flatDesc.getFactColumns();
- }
-
- @Override
- public List<TblColRef> getFactColumns() {
- return null;
- }
-
- @Override
- public int getColumnIndex(TblColRef colRef) {
- return 0;
- }
-
- @Override
- public SegmentRange getSegRange() {
- return null;
- }
-
- @Override
- public TblColRef getDistributedBy() {
- return null;
- }
-
- @Override
- public TblColRef getClusterBy() {
- return null;
- }
-
- @Override
- public ISegment getSegment() {
- return null;
- }
-
- @Override
- public boolean useAlias() {
- return false;
- }
- };
- final String dropFactTableHql = JoinedFlatTable.generateDropTableStatement(mockfactDesc);
- // the table inputformat is sequence file
- final String createFactTableHql = JoinedFlatTable.generateCreateTableStatement(mockfactDesc, baseLocation, JoinedFlatTable.SEQUENCEFILE);
-
- final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
- final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, baseLocation);
- String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc);
- insertDataHqls = insertDataHqls.replace(flatDesc.getDataModel().getRootFactTableName() + " ", mockFactTableName + " ");
-
- CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
- CubingExecutableUtil.setCubeName(cubeName, step.getParams());
- step.setInitStatement(hiveInitStatements);
- step.setCreateTableStatement(dropFactTableHql + createFactTableHql + dropTableHql + createTableHql + insertDataHqls);
- step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
-
- intermediateTables.add(flatDesc.getTableName());
- intermediateTables.add(mockFactTableName);
- intermediatePaths.add(baseLocation + "/" + flatDesc.getTableName());
- intermediatePaths.add(baseLocation + "/" + mockFactTableName);
- return step;
- }
protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
return JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobFlow.getId());
}
- private MapReduceExecutable createSaveKafkaDataStep(String jobId, String location) {
- MapReduceExecutable result = new MapReduceExecutable();
- result.setName("Save data from Kafka");
- result.setMapReduceJobClass(KafkaFlatTableJob.class);
- JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "system");
- StringBuilder cmd = new StringBuilder();
- jobBuilderSupport.appendMapReduceParameters(cmd);
- JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
- JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, location);
- JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
- JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step");
-
- result.setMapReduceParams(cmd.toString());
- return result;
- }
-
@Override
public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
- HiveMRInput.GarbageCollectionStep step = new HiveMRInput.GarbageCollectionStep();
- step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
- step.setIntermediateTables(intermediateTables);
- step.setExternalDataPaths(intermediatePaths);
- jobFlow.addTask(step);
+ jobFlow.addTask(createGCStep(intermediateTables, intermediatePaths));
}
@@ -279,48 +169,8 @@ public class KafkaMRInput implements IMRInput {
@Override
public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
-
- final MergeOffsetStep result = new MergeOffsetStep();
- result.setName("Merge offset step");
-
- CubingExecutableUtil.setCubeName(cubeSegment.getCubeInstance().getName(), result.getParams());
- CubingExecutableUtil.setSegmentId(cubeSegment.getUuid(), result.getParams());
- CubingExecutableUtil.setCubingJobId(jobFlow.getId(), result.getParams());
- jobFlow.addTask(result);
+ jobFlow.addTask(createMergeOffsetStep(jobFlow.getId(), cubeSegment));
}
}
- @Deprecated
- public static class GarbageCollectionStep extends AbstractExecutable {
- private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- try {
- rmdirOnHDFS(getDataPath());
- } catch (IOException e) {
- logger.error("job:" + getId() + " execute finished with exception", e);
- return ExecuteResult.createError(e);
- }
-
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "HDFS path " + getDataPath() + " is dropped.\n");
- }
-
- private void rmdirOnHDFS(String path) throws IOException {
- Path externalDataPath = new Path(path);
- FileSystem fs = HadoopUtil.getWorkingFileSystem();
- if (fs.exists(externalDataPath)) {
- fs.delete(externalDataPath, true);
- }
- }
-
- public void setDataPath(String externalDataPath) {
- setParam("dataPath", externalDataPath);
- }
-
- private String getDataPath() {
- return getParam("dataPath");
- }
-
- }
}
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 1d65b96..70d37aa 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -29,6 +29,7 @@ import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.spark.ISparkInput;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.ISourceAware;
@@ -61,6 +62,8 @@ public class KafkaSource implements ISource {
public <I> I adaptToBuildEngine(Class<I> engineInterface) {
if (engineInterface == IMRInput.class) {
return (I) new KafkaMRInput();
+ } else if(engineInterface == ISparkInput.class) {
+ return (I) new KafkaSparkInput();
} else {
throw new RuntimeException("Cannot adapt to " + engineInterface);
}
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java
new file mode 100644
index 0000000..7600329
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.kafka;
+
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.spark.ISparkInput;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class KafkaSparkInput extends KafkaInputBase implements ISparkInput {
+
+ private static final Logger logger = LoggerFactory.getLogger(KafkaSparkInput.class);
+ private CubeSegment cubeSegment;
+
+ @Override
+ public ISparkInput.ISparkBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+ this.cubeSegment = (CubeSegment) flatDesc.getSegment();
+ return new BatchCubingInputSide(cubeSegment, flatDesc);
+ }
+
+ @Override
+ public ISparkBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+ return new KafkaSparkBatchMergeInputSide((CubeSegment) seg);
+ }
+
+ public static class BatchCubingInputSide implements ISparkBatchCubingInputSide {
+
+ final JobEngineConfig conf;
+ final CubeSegment seg;
+ private CubeDesc cubeDesc;
+ private KylinConfig config;
+ protected IJoinedFlatTableDesc flatDesc;
+ protected String hiveTableDatabase;
+ final private List<String> intermediateTables = Lists.newArrayList();
+ final private List<String> intermediatePaths = Lists.newArrayList();
+ private String cubeName;
+
+ public BatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
+ this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
+ this.config = seg.getConfig();
+ this.flatDesc = flatDesc;
+ this.hiveTableDatabase = config.getHiveDatabaseForIntermediateTable();
+ this.seg = seg;
+ this.cubeDesc = seg.getCubeDesc();
+ this.cubeName = seg.getCubeInstance().getName();
+ }
+
+ @Override
+ public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
+
+ boolean onlyOneTable = cubeDesc.getModel().getLookupTables().size() == 0;
+ final String baseLocation = getJobWorkingDir(jobFlow);
+ if (onlyOneTable) {
+ // directly use flat table location
+ final String intermediateFactTable = flatDesc.getTableName();
+ final String tableLocation = baseLocation + "/" + intermediateFactTable;
+ jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), tableLocation, seg));
+ intermediatePaths.add(tableLocation);
+ } else {
+ final String mockFactTableName = MetadataConstants.KYLIN_INTERMEDIATE_PREFIX + cubeName.toLowerCase()
+ + "_" + seg.getUuid().replaceAll("-", "_") + "_fact";
+ jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + mockFactTableName, seg));
+ jobFlow.addTask(createFlatTable(hiveTableDatabase, mockFactTableName, baseLocation, cubeName, cubeDesc,
+ flatDesc, intermediateTables, intermediatePaths));
+ }
+ }
+
+ protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
+ return JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobFlow.getId());
+ }
+
+ @Override
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(createGCStep(intermediateTables, intermediatePaths));
+
+ }
+ }
+
+ class KafkaSparkBatchMergeInputSide implements ISparkBatchMergeInputSide {
+
+ private CubeSegment cubeSegment;
+
+ KafkaSparkBatchMergeInputSide(CubeSegment cubeSegment) {
+ this.cubeSegment = cubeSegment;
+ }
+
+ @Override
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(createMergeOffsetStep(jobFlow.getId(), cubeSegment));
+ }
+ }
+
+}
diff --git a/storage-hbase/pom.xml b/storage-hbase/pom.xml
index a2494c2..c1b4cea 100644
--- a/storage-hbase/pom.xml
+++ b/storage-hbase/pom.xml
@@ -42,6 +42,11 @@
<artifactId>kylin-engine-mr</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-engine-spark</artifactId>
+ </dependency>
+
<!-- Env & Test -->
<dependency>
<groupId>org.apache.kylin</groupId>
@@ -102,6 +107,13 @@
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
+
+ <!-- Spark dependency -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.11</artifactId>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
index d24285b..ded6598 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
@@ -18,10 +18,10 @@
package org.apache.kylin.storage.hbase;
-import com.google.common.base.Preconditions;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.spark.ISparkOutput;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.DataModelManager;
import org.apache.kylin.metadata.model.IStorageAware;
@@ -32,6 +32,9 @@ import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.storage.IStorage;
import org.apache.kylin.storage.IStorageQuery;
import org.apache.kylin.storage.hbase.steps.HBaseMROutput2Transition;
+import org.apache.kylin.storage.hbase.steps.HBaseSparkOutputTransition;
+
+import com.google.common.base.Preconditions;
@SuppressWarnings("unused")
//used by reflection
@@ -47,14 +50,16 @@ public class HBaseStorage implements IStorage {
CubeInstance cubeInstance = (CubeInstance) realization;
String cubeStorageQuery;
if (cubeInstance.getStorageType() == IStorageAware.ID_HBASE) {//v2 query engine cannot go with v1 storage now
- throw new IllegalStateException("Storage Engine (id=" + IStorageAware.ID_HBASE + ") is not supported any more");
+ throw new IllegalStateException(
+ "Storage Engine (id=" + IStorageAware.ID_HBASE + ") is not supported any more");
} else {
cubeStorageQuery = v2CubeStorageQuery;//by default use v2
}
IStorageQuery ret;
try {
- ret = (IStorageQuery) Class.forName(cubeStorageQuery).getConstructor(CubeInstance.class).newInstance((CubeInstance) realization);
+ ret = (IStorageQuery) Class.forName(cubeStorageQuery).getConstructor(CubeInstance.class)
+ .newInstance((CubeInstance) realization);
} catch (Exception e) {
throw new RuntimeException("Failed to initialize storage query for " + cubeStorageQuery, e);
}
@@ -67,11 +72,13 @@ public class HBaseStorage implements IStorage {
private static TblColRef getPartitionCol(IRealization realization) {
String modelName = realization.getModel().getName();
- DataModelDesc dataModelDesc = DataModelManager.getInstance(KylinConfig.getInstanceFromEnv()).getDataModelDesc(modelName);
+ DataModelDesc dataModelDesc = DataModelManager.getInstance(KylinConfig.getInstanceFromEnv())
+ .getDataModelDesc(modelName);
PartitionDesc partitionDesc = dataModelDesc.getPartitionDesc();
Preconditions.checkArgument(partitionDesc != null, "PartitionDesc for " + realization + " is null!");
TblColRef partitionColRef = partitionDesc.getPartitionDateColumnRef();
- Preconditions.checkArgument(partitionColRef != null, "getPartitionDateColumnRef for " + realization + " is null");
+ Preconditions.checkArgument(partitionColRef != null,
+ "getPartitionDateColumnRef for " + realization + " is null");
return partitionColRef;
}
@@ -80,6 +87,8 @@ public class HBaseStorage implements IStorage {
public <I> I adaptToBuildEngine(Class<I> engineInterface) {
if (engineInterface == IMROutput2.class) {
return (I) new HBaseMROutput2Transition();
+ } else if (engineInterface == ISparkOutput.class) {
+ return (I) new HBaseSparkOutputTransition();
} else {
throw new RuntimeException("Cannot adapt to " + engineInterface);
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
similarity index 87%
copy from storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
copy to storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
index 28ccca8..4fda139 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
@@ -18,36 +18,41 @@
package org.apache.kylin.storage.hbase.steps;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidModeEnum;
-import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.storage.hbase.HBaseConnection;
-import java.util.ArrayList;
-import java.util.List;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
-public class HBaseMRSteps extends JobBuilderSupport {
+/**
+ * Common steps for building cube into HBase
+ */
+public abstract class HBaseJobSteps extends JobBuilderSupport {
- public HBaseMRSteps(CubeSegment seg) {
+ public HBaseJobSteps(CubeSegment seg) {
super(seg, null);
}
public HadoopShellExecutable createCreateHTableStep(String jobId) {
return createCreateHTableStep(jobId, CuboidModeEnum.CURRENT);
}
-
+
+ // TODO make it abstract
public HadoopShellExecutable createCreateHTableStep(String jobId, CuboidModeEnum cuboidMode) {
HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
@@ -63,8 +68,8 @@ public class HBaseMRSteps extends JobBuilderSupport {
return createHtableStep;
}
+ // TODO make it abstract
public MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegment> mergingSegments, String jobID, Class<? extends AbstractHadoopJob> clazz) {
-
final List<String> mergingCuboidPaths = Lists.newArrayList();
for (CubeSegment merging : mergingSegments) {
mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
@@ -88,29 +93,9 @@ public class HBaseMRSteps extends JobBuilderSupport {
return mergeCuboidDataStep;
}
- public MapReduceExecutable createConvertCuboidToHfileStep(String jobId) {
- String cuboidRootPath = getCuboidRootPath(jobId);
- String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/") + "*";
-
- MapReduceExecutable createHFilesStep = new MapReduceExecutable();
- createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
- StringBuilder cmd = new StringBuilder();
-
- appendMapReduceParameters(cmd);
- appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
- appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");
- appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
- appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getHFilePath(jobId));
- appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier());
- appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_HFile_Generator_" + seg.getRealization().getName() + "_Step");
-
- createHFilesStep.setMapReduceParams(cmd.toString());
- createHFilesStep.setMapReduceJobClass(CubeHFileJob.class);
- createHFilesStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
-
- return createHFilesStep;
- }
+ abstract public AbstractExecutable createConvertCuboidToHfileStep(String jobId);
+ // TODO make it abstract
public HadoopShellExecutable createBulkLoadStep(String jobId) {
HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index ea372d9..94a60f8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -42,6 +42,7 @@ import org.apache.kylin.engine.mr.steps.InMemCuboidMapper;
import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
import org.apache.kylin.engine.mr.steps.NDCuboidMapper;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IEngineAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,8 +63,13 @@ public class HBaseMROutput2Transition implements IMROutput2 {
@Override
public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(final CubeSegment seg) {
+
+ boolean useSpark = seg.getCubeDesc().getEngineType() == IEngineAware.ID_SPARK;
+
+ // TODO need refactor
+ final HBaseJobSteps steps = useSpark ? new HBaseSparkSteps(seg) : new HBaseMRSteps(seg);
+
return new IMRBatchCubingOutputSide2() {
- HBaseMRSteps steps = new HBaseMRSteps(seg);
@Override
public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 28ccca8..25683d3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -18,77 +18,26 @@
package org.apache.kylin.storage.hbase.steps;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidModeEnum;
import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.job.execution.AbstractExecutable;
-import java.util.ArrayList;
-import java.util.List;
-
-public class HBaseMRSteps extends JobBuilderSupport {
+public class HBaseMRSteps extends HBaseJobSteps {
public HBaseMRSteps(CubeSegment seg) {
- super(seg, null);
+ super(seg);
}
public HadoopShellExecutable createCreateHTableStep(String jobId) {
return createCreateHTableStep(jobId, CuboidModeEnum.CURRENT);
}
-
- public HadoopShellExecutable createCreateHTableStep(String jobId, CuboidModeEnum cuboidMode) {
- HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
- createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
- StringBuilder cmd = new StringBuilder();
- appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
- appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
- appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
- appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString());
- createHtableStep.setJobParams(cmd.toString());
- createHtableStep.setJobClass(CreateHTableJob.class);
-
- return createHtableStep;
- }
-
- public MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegment> mergingSegments, String jobID, Class<? extends AbstractHadoopJob> clazz) {
-
- final List<String> mergingCuboidPaths = Lists.newArrayList();
- for (CubeSegment merging : mergingSegments) {
- mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
- }
- String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
- String outputPath = getCuboidRootPath(jobID);
-
- MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
- mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
- StringBuilder cmd = new StringBuilder();
-
- appendMapReduceParameters(cmd);
- appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
- appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, formattedPath);
- appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
- appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
-
- mergeCuboidDataStep.setMapReduceParams(cmd.toString());
- mergeCuboidDataStep.setMapReduceJobClass(clazz);
- return mergeCuboidDataStep;
- }
-
- public MapReduceExecutable createConvertCuboidToHfileStep(String jobId) {
+ public AbstractExecutable createConvertCuboidToHfileStep(String jobId) {
String cuboidRootPath = getCuboidRootPath(jobId);
String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/") + "*";
@@ -110,158 +59,4 @@ public class HBaseMRSteps extends JobBuilderSupport {
return createHFilesStep;
}
-
- public HadoopShellExecutable createBulkLoadStep(String jobId) {
- HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
- bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
-
- StringBuilder cmd = new StringBuilder();
- appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getHFilePath(jobId));
- appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier());
- appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
-
- bulkLoadStep.setJobParams(cmd.toString());
- bulkLoadStep.setJobClass(BulkLoadJob.class);
-
- return bulkLoadStep;
- }
-
- public MergeGCStep createMergeGCStep() {
- MergeGCStep result = new MergeGCStep();
- result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HBASE);
- result.setOldHTables(getMergingHTables());
- return result;
- }
-
- public MergeGCStep createOptimizeGCStep() {
- MergeGCStep result = new MergeGCStep();
- result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
- result.setOldHTables(getOptimizeHTables());
- return result;
- }
-
- public List<CubeSegment> getOptimizeSegments() {
- CubeInstance cube = (CubeInstance) seg.getRealization();
- List<CubeSegment> newSegments = Lists.newArrayList(cube.getSegments(SegmentStatusEnum.READY_PENDING));
- List<CubeSegment> oldSegments = Lists.newArrayListWithExpectedSize(newSegments.size());
- for (CubeSegment segment : newSegments) {
- oldSegments.add(cube.getOriginalSegmentToOptimize(segment));
- }
- return oldSegments;
- }
-
- public List<String> getOptimizeHTables() {
- return getOldHTables(getOptimizeSegments());
- }
-
- public List<String> getOldHTables(final List<CubeSegment> oldSegments) {
- final List<String> oldHTables = Lists.newArrayListWithExpectedSize(oldSegments.size());
- for (CubeSegment segment : oldSegments) {
- oldHTables.add(segment.getStorageLocationIdentifier());
- }
- return oldHTables;
- }
-
- public List<String> getMergingHTables() {
- final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment) seg);
- Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge, target segment " + seg);
- final List<String> mergingHTables = Lists.newArrayList();
- for (CubeSegment merging : mergingSegments) {
- mergingHTables.add(merging.getStorageLocationIdentifier());
- }
- return mergingHTables;
- }
-
- public List<String> getMergingHDFSPaths() {
- final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment) seg);
- Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge, target segment " + seg);
- final List<String> mergingHDFSPaths = Lists.newArrayList();
- for (CubeSegment merging : mergingSegments) {
- mergingHDFSPaths.add(getJobWorkingDir(merging.getLastBuildJobID()));
- }
- return mergingHDFSPaths;
- }
-
- public List<String> getOptimizeHDFSPaths() {
- return getOldHDFSPaths(getOptimizeSegments());
- }
-
- public List<String> getOldHDFSPaths(final List<CubeSegment> oldSegments) {
- final List<String> oldHDFSPaths = Lists.newArrayListWithExpectedSize(oldSegments.size());
- for (CubeSegment oldSegment : oldSegments) {
- oldHDFSPaths.add(getJobWorkingDir(oldSegment.getLastBuildJobID()));
- }
- return oldHDFSPaths;
- }
-
- public String getHFilePath(String jobId) {
- return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/hfile/");
- }
-
- public String getRowkeyDistributionOutputPath(String jobId) {
- return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/rowkey_stats");
- }
-
- public void addOptimizeGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
- String jobId = jobFlow.getId();
-
- List<String> toDeletePaths = new ArrayList<>();
- toDeletePaths.add(getOptimizationRootPath(jobId));
-
- HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
- step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
- step.setDeletePaths(toDeletePaths);
- step.setJobId(jobId);
-
- jobFlow.addTask(step);
- }
-
- public void addCheckpointGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
- String jobId = jobFlow.getId();
-
- jobFlow.addTask(createOptimizeGCStep());
-
- List<String> toDeletePaths = new ArrayList<>();
- toDeletePaths.addAll(getOptimizeHDFSPaths());
-
- HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
- step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
- step.setDeletePaths(toDeletePaths);
- step.setJobId(jobId);
-
- jobFlow.addTask(step);
- }
-
- public void addMergingGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
- String jobId = jobFlow.getId();
-
- jobFlow.addTask(createMergeGCStep());
-
- List<String> toDeletePaths = new ArrayList<>();
- toDeletePaths.addAll(getMergingHDFSPaths());
- toDeletePaths.add(getHFilePath(jobId));
-
- HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
- step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
- step.setDeletePaths(toDeletePaths);
- step.setJobId(jobId);
-
- jobFlow.addTask(step);
- }
-
- public void addCubingGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
- String jobId = jobFlow.getId();
-
- List<String> toDeletePaths = new ArrayList<>();
- toDeletePaths.add(getFactDistinctColumnsPath(jobId));
- toDeletePaths.add(getHFilePath(jobId));
-
- HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
- step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HBASE);
- step.setDeletePaths(toDeletePaths);
- step.setJobId(jobId);
-
- jobFlow.addTask(step);
- }
-
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
new file mode 100644
index 0000000..08f58af
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import java.util.List;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
+import org.apache.kylin.engine.spark.ISparkOutput;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "Transition" impl generates cuboid files and then convert to HFile.
+ * The additional step slows down build process, but the gains is merge
+ * can read from HDFS instead of over HBase region server. See KYLIN-1007.
+ *
+ * This is transitional because finally we want to merge from HTable snapshot.
+ * However multiple snapshots as MR input is only supported by HBase 1.x.
+ * Before most users upgrade to latest HBase, they can only use this transitional
+ * cuboid file solution.
+ */
+public class HBaseSparkOutputTransition implements ISparkOutput {
+
+ @SuppressWarnings("unused")
+ private static final Logger logger = LoggerFactory.getLogger(HBaseSparkOutputTransition.class);
+
+ @Override
+ public ISparkBatchCubingOutputSide getBatchCubingOutputSide(final CubeSegment seg) {
+ final HBaseJobSteps steps = new HBaseSparkSteps(seg);
+
+ return new ISparkBatchCubingOutputSide() {
+
+ @Override
+ public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(steps.createCreateHTableStep(jobFlow.getId()));
+ }
+
+ @Override
+ public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
+ jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
+ }
+
+ @Override
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+ // nothing to do
+ }
+
+ };
+ }
+
+ @Override
+ public ISparkBatchMergeOutputSide getBatchMergeOutputSide(final CubeSegment seg) {
+ return new ISparkBatchMergeOutputSide() {
+ HBaseSparkSteps steps = new HBaseSparkSteps(seg);
+
+ @Override
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(steps.createCreateHTableStep(jobFlow.getId()));
+ }
+
+ @Override
+ public void addStepPhase2_BuildCube(CubeSegment seg, List<CubeSegment> mergingSegments,
+ DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(
+ steps.createMergeCuboidDataStep(seg, mergingSegments, jobFlow.getId(), MergeCuboidJob.class));
+ jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
+ jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
+ }
+
+ @Override
+ public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
+ steps.addMergingGarbageCollectionSteps(jobFlow);
+ }
+
+ };
+ }
+
+ public ISparkBatchOptimizeOutputSide getBatchOptimizeOutputSide(final CubeSegment seg) {
+ return null;
+ }
+}
\ No newline at end of file
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
new file mode 100644
index 0000000..c2794ff
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.steps;
+
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.spark.SparkBatchCubingJobBuilder2;
+import org.apache.kylin.engine.spark.SparkExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.AbstractExecutable;
+
+public class HBaseSparkSteps extends HBaseJobSteps {
+
+ public HBaseSparkSteps(CubeSegment seg) {
+ super(seg);
+ }
+
+ public AbstractExecutable createConvertCuboidToHfileStep(String jobId) {
+ String cuboidRootPath = getCuboidRootPath(jobId);
+ String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/");
+
+ SparkBatchCubingJobBuilder2 jobBuilder2 = new SparkBatchCubingJobBuilder2(seg, null);
+ final SparkExecutable sparkExecutable = new SparkExecutable();
+ sparkExecutable.setClassName(SparkCubeHFile.class.getName());
+ sparkExecutable.setParam(SparkCubeHFile.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+ sparkExecutable.setParam(SparkCubeHFile.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+ sparkExecutable.setParam(SparkCubeHFile.OPTION_INPUT_PATH.getOpt(), inputPath);
+ sparkExecutable.setParam(SparkCubeHFile.OPTION_META_URL.getOpt(),
+ jobBuilder2.getSegmentMetadataUrl(seg.getConfig(), jobId));
+ sparkExecutable.setParam(SparkCubeHFile.OPTION_OUTPUT_PATH.getOpt(), getHFilePath(jobId));
+ sparkExecutable.setParam(SparkCubeHFile.OPTION_PARTITION_FILE_PATH.getOpt(), getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");
+
+ sparkExecutable.setJobId(jobId);
+
+ StringBuilder jars = new StringBuilder();
+ StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.KeyValue.class));
+ StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.class));
+ StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.regionserver.BloomType.class));
+ StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.protobuf.generated.HFileProtos.class)); //hbase-protocal.jar
+ StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.CompatibilityFactory.class)); //hbase-hadoop-compact.jar
+ StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar("org.htrace.HTraceConfiguration", null)); // htrace-core.jar
+ StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar("org.apache.htrace.Trace", null)); // htrace-core.jar
+ StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar("com.yammer.metrics.core.MetricsRegistry", null)); // metrics-core.jar
+
+ StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
+ sparkExecutable.setJars(jars.toString());
+
+ sparkExecutable.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
+ sparkExecutable.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+
+ return sparkExecutable;
+ }
+
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
index 222c9f4..c67d54d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
@@ -18,6 +18,7 @@
package org.apache.kylin.storage.hbase.steps;
+import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.List;
@@ -32,7 +33,7 @@ import org.apache.kylin.metadata.model.MeasureDesc;
/**
* @author George Song (ysong1)
*/
-public class KeyValueCreator {
+public class KeyValueCreator implements Serializable {
byte[] cfBytes;
byte[] qBytes;
long timestamp;
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowKeyWritable.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowKeyWritable.java
index 599a3f4..bba68da 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowKeyWritable.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowKeyWritable.java
@@ -18,19 +18,21 @@
package org.apache.kylin.storage.hbase.steps;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.io.Serializable;
-public class RowKeyWritable implements WritableComparable<RowKeyWritable> {
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.kylin.common.util.BytesUtil;
+
+public class RowKeyWritable implements WritableComparable<RowKeyWritable>, Serializable {
private byte[] data;
private int offset;
private int length;
- private KeyValue.KVComparator kvComparator = new KeyValue.KVComparator();
+ private final static SerializableKVComparator kvComparator = new SerializableKVComparator();
static {
WritableComparator.define(RowKeyWritable.class, new RowKeyComparator());
@@ -56,6 +58,18 @@ public class RowKeyWritable implements WritableComparable<RowKeyWritable> {
this.length = length;
}
+ public byte[] getData() {
+ return data;
+ }
+
+ public int getOffset() {
+ return offset;
+ }
+
+ public int getLength() {
+ return length;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.length);
@@ -70,12 +84,23 @@ public class RowKeyWritable implements WritableComparable<RowKeyWritable> {
this.offset = 0;
}
+ @Override
public int compareTo(RowKeyWritable other) {
return kvComparator.compare(this.data, this.offset, this.length, other.data, other.offset, other.length);
}
- public static class RowKeyComparator extends WritableComparator {
- private KeyValue.KVComparator kvComparator = new KeyValue.KVComparator();
+ @Override
+ public String toString() {
+ return BytesUtil.toHex(data, offset, length);
+ }
+
+ public static class SerializableKVComparator extends KeyValue.KVComparator implements Serializable {
+
+ }
+
+ public static class RowKeyComparator extends WritableComparator implements Serializable {
+ public static final RowKeyComparator INSTANCE = new RowKeyComparator();
+ private SerializableKVComparator kvComparator = new SerializableKVComparator();
private static final int LENGTH_BYTES = 4;
@Override
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
new file mode 100644
index 0000000..b2571ae
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.hbase.steps;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.engine.spark.KylinSparkJobListener;
+import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.spark.Partitioner;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.api.java.function.PairFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import scala.Tuple2;
+
+/**
+ * Spark application to build cube with the "by-layer" algorithm. Only support source data from Hive; Metadata in HBase.
+ */
+public class SparkCubeHFile extends AbstractApplication implements Serializable {
+
+ protected static final Logger logger = LoggerFactory.getLogger(SparkCubeHFile.class);
+
+ public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+ .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+ public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true)
+ .withDescription("Cube Segment Id").create("segmentId");
+ public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+ .withDescription("HDFS metadata url").create("metaUrl");
+ public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+ .isRequired(true).withDescription("HFile output path").create(BatchConstants.ARG_OUTPUT);
+ public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+ .isRequired(true).withDescription("Cuboid files PATH").create(BatchConstants.ARG_INPUT);
+ public static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName(BatchConstants.ARG_PARTITION)
+ .hasArg().isRequired(true).withDescription("Partition file path.").create(BatchConstants.ARG_PARTITION);
+
+ private Options options;
+
+ private KylinSparkJobListener jobListener;
+
+ public SparkCubeHFile() {
+ options = new Options();
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_META_URL);
+ options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_PARTITION_FILE_PATH);
+ jobListener = new KylinSparkJobListener();
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ @Override
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ final String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+ final String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+ final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+ final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+ final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+ final Path partitionFilePath = new Path(optionsHelper.getOptionValue(OPTION_PARTITION_FILE_PATH));
+
+ Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"), KeyValueCreator.class,
+ KeyValue.class, RowKeyWritable.class };
+
+ SparkConf conf = new SparkConf().setAppName("Covnerting Hfile for:" + cubeName + " segment " + segmentId);
+ //serialization conf
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
+ conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
+
+ JavaSparkContext sc = new JavaSparkContext(conf);
+ sc.sc().addSparkListener(jobListener);
+ final FileSystem fs = partitionFilePath.getFileSystem(sc.hadoopConfiguration());
+ if (!fs.exists(partitionFilePath)) {
+ throw new IllegalArgumentException("File not exist: " + partitionFilePath.toString());
+ }
+
+ HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+ final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+
+ final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+ final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+ final CubeDesc cubeDesc = cubeInstance.getDescriptor();
+ final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+
+ final MeasureCodec inputCodec = new MeasureCodec(cubeDesc.getMeasures());
+ final List<KeyValueCreator> keyValueCreators = Lists.newArrayList();
+
+ for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+ keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
+ }
+ }
+
+ final int cfNum = keyValueCreators.size();
+ final boolean quickPath = (keyValueCreators.size() == 1) && keyValueCreators.get(0).isFullCopy;
+
+ logger.info("Input path: {}", inputPath);
+ logger.info("Output path: {}", outputPath);
+
+ List<JavaPairRDD> inputRDDs = parseInputPath(inputPath, fs, sc);
+ final JavaPairRDD<Text, Text> allCuboidFile = sc.union(inputRDDs.toArray(new JavaPairRDD[inputRDDs.size()]));
+ final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
+ if (quickPath) {
+ hfilerdd = allCuboidFile.mapToPair(new PairFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
+ @Override
+ public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
+ KeyValue outputValue = keyValueCreators.get(0).create(textTextTuple2._1,
+ textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength());
+ return new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()), outputValue);
+ }
+ });
+ } else {
+ hfilerdd = allCuboidFile
+ .flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
+ @Override
+ public Iterator<Tuple2<RowKeyWritable, KeyValue>> call(Tuple2<Text, Text> textTextTuple2)
+ throws Exception {
+
+ List<Tuple2<RowKeyWritable, KeyValue>> result = Lists.newArrayListWithExpectedSize(cfNum);
+ Object[] inputMeasures = new Object[cubeDesc.getMeasures().size()];
+ inputCodec.decode(
+ ByteBuffer.wrap(textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength()),
+ inputMeasures);
+
+ for (int i = 0; i < cfNum; i++) {
+ KeyValue outputValue = keyValueCreators.get(i).create(textTextTuple2._1, inputMeasures);
+ result.add(new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()),
+ outputValue));
+ }
+
+ return result.iterator();
+ }
+ });
+ }
+
+ allCuboidFile.unpersist();
+
+ // read partition split keys
+ List<RowKeyWritable> keys = new ArrayList<>();
+ try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, partitionFilePath, sc.hadoopConfiguration())) {
+ RowKeyWritable key = new RowKeyWritable();
+ Writable value = NullWritable.get();
+ while (reader.next(key, value)) {
+ keys.add(key);
+ logger.info(" ------- split key: " + key);
+ key = new RowKeyWritable(); // important, new an object!
+ }
+ }
+
+ logger.info("There are " + keys.size() + " split keys, totally " + (keys.size() + 1) + " hfiles");
+
+ final JavaPairRDD<ImmutableBytesWritable, KeyValue> hfilerdd2 = hfilerdd
+ .repartitionAndSortWithinPartitions(new HFilePartitioner(keys),
+ RowKeyWritable.RowKeyComparator.INSTANCE)
+ .mapToPair(new PairFunction<Tuple2<RowKeyWritable, KeyValue>, ImmutableBytesWritable, KeyValue>() {
+ @Override
+ public Tuple2<ImmutableBytesWritable, KeyValue> call(
+ Tuple2<RowKeyWritable, KeyValue> rowKeyWritableKeyValueTuple2) throws Exception {
+ return new Tuple2<>(new ImmutableBytesWritable(rowKeyWritableKeyValueTuple2._2.getKey()),
+ rowKeyWritableKeyValueTuple2._2);
+ }
+ });
+
+ Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
+ HadoopUtil.healSickConfig(hbaseConf);
+ Job job = new Job(hbaseConf, cubeSegment.getStorageLocationIdentifier());
+ HTable table = new HTable(hbaseConf, cubeSegment.getStorageLocationIdentifier());
+ try {
+ HFileOutputFormat2.configureIncrementalLoadMap(job, table);
+ } catch (IOException ioe) {
+ // this can be ignored.
+ logger.debug(ioe.getMessage());
+ }
+
+ hfilerdd2.saveAsNewAPIHadoopFile(outputPath, ImmutableBytesWritable.class, KeyValue.class,
+ HFileOutputFormat2.class, job.getConfiguration());
+
+ System.out.println("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
+ // deleteHDFSMeta(metaUrl);
+ }
+
+ private List<JavaPairRDD> parseInputPath(String inputPath, FileSystem fs, JavaSparkContext sc) throws IOException {
+ List<JavaPairRDD> inputRDDs = Lists.newArrayList();
+ Path inputHDFSPath = new Path(inputPath);
+ FileStatus[] fileStatuses = fs.listStatus(inputHDFSPath);
+ boolean hasDir = false;
+ for (FileStatus stat : fileStatuses) {
+ if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) {
+ hasDir = true;
+ inputRDDs.add(sc.sequenceFile(stat.getPath().toString(), Text.class, Text.class));
+ }
+ }
+
+ if (!hasDir) {
+ inputRDDs.add(sc.sequenceFile(inputHDFSPath.toString(), Text.class, Text.class));
+ }
+
+ return inputRDDs;
+ }
+
+ static class HFilePartitioner extends Partitioner {
+ private List<RowKeyWritable> keys;
+
+ public HFilePartitioner(List splitKyes) {
+ keys = splitKyes;
+ }
+
+ @Override
+ public int numPartitions() {
+ return keys.size() + 1;
+ }
+
+ @Override
+ public int getPartition(Object o) {
+ int pos = Collections.binarySearch(this.keys, (RowKeyWritable) o) + 1;
+ return pos < 0 ? -pos : pos;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ HFilePartitioner that = (HFilePartitioner) o;
+ return Objects.equals(keys, that.keys);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(keys);
+ }
+ }
+
+ protected void deleteHDFSMeta(String metaUrl) throws IOException {
+ String realHdfsPath = StorageURL.valueOf(metaUrl).getParameter("path");
+ HadoopUtil.getFileSystem(realHdfsPath).delete(new Path(realHdfsPath), true);
+ logger.info("Delete metadata in HDFS for this job: " + realHdfsPath);
+ }
+
+}
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/HFilePartitionerTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/HFilePartitionerTest.java
new file mode 100644
index 0000000..827aef8
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/HFilePartitionerTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.steps;
+
+import java.util.ArrayList;
+
+import org.apache.kylin.common.util.BytesUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HFilePartitionerTest {
+
+ @Test
+ public void testPartitioner() {
+ String[] splitKyes = new String[] {
+ "\\x00\\x0A\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x5F\\x03\\x00\\x7F\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF",
+ "\\x00\\x02\\x00\\x01\\x00\\x7F\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF",
+ "\\x00\\x0A\\x00\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x58\\xF3\\x00\\x7F\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF",
+ "\\x00\\x02\\x00\\x02\\x00\\x7F\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF",
+ "\\x00\\x0A\\x00\\x02\\x00\\x00\\x00\\x00\\x00\\x00\\x58\\x5C\\x00\\x7F\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF",
+ "\\x00\\x0A\\x00\\x02\\x00\\x00\\x00\\x00\\x00\\x00\\x7C\\x50\\x00\\x7F\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF",
+ "\\x00\\x02\\x00\\x03\\x00\\x7F\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF",
+ "\\x00\\x0A\\x00\\x03\\x00\\x00\\x00\\x00\\x00\\x00\\x5B\\xF3\\x00\\x7F\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF",
+ "\\x00\\x02\\x00\\x04\\x00\\x7F\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF",
+ "\\x00\\x0A\\x00\\x04\\x00\\x00\\x00\\x00\\x00\\x00\\x5B\\xFC\\x00\\x7F\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF\\xFF", };
+
+ ArrayList<RowKeyWritable> keys = new ArrayList();
+ for (int i = 0; i < splitKyes.length; i++) {
+ byte[] bytes = BytesUtil.fromHex(splitKyes[i]);
+ RowKeyWritable rowKeyWritable = new RowKeyWritable(bytes);
+ keys.add(rowKeyWritable);
+ }
+
+ SparkCubeHFile.HFilePartitioner partitioner = new SparkCubeHFile.HFilePartitioner(keys);
+
+ String testRowKey = "\\x00\\x11\\x00\\x02\\x00\\x00\\x00\\x00\\x00\\x00\\x40\\xAC\\x0B\\x37\\xF9\\x05\\x04\\x02\\x00\\x02\\x46\\x32\\x4D\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x04";
+
+ int partition = partitioner.getPartition(new RowKeyWritable(BytesUtil.fromHex(testRowKey)));
+
+ Assert.assertEquals(4, partition);
+
+ testRowKey = "\\x00\\x0D\\x00\\x04\\x00\\x00\\x00\\x00\\x00\\x00\\x40\\x00\\x0B\\x39\\x6C\\x02\\x46\\x32\\x4D\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x04";
+ partition = partitioner.getPartition(new RowKeyWritable(BytesUtil.fromHex(testRowKey)));
+
+ Assert.assertEquals(9, partition);
+
+ }
+
+}