You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/11 08:19:03 UTC
[kylin] 01/02: KYLIN-3710 JDBC data source support Spark cubing
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 195640eb0499a449a02b412af7e247cdcde47cec
Author: chao long <wa...@qq.com>
AuthorDate: Mon Dec 10 13:04:59 2018 +0800
KYLIN-3710 JDBC data source support Spark cubing
---
.../java/org/apache/kylin/engine/mr/IInput.java | 31 +--
.../java/org/apache/kylin/engine/mr/IMRInput.java | 24 +--
.../java/org/apache/kylin/engine/mr/MRUtil.java | 7 +-
.../org/apache/kylin/engine/spark/ISparkInput.java | 24 +--
.../org/apache/kylin/engine/spark/SparkUtil.java | 10 +-
.../apache/kylin/source/hive/HiveInputBase.java | 90 ++++++++-
.../org/apache/kylin/source/hive/HiveMRInput.java | 103 ++--------
.../apache/kylin/source/hive/HiveSparkInput.java | 84 +-------
...JdbcHiveMRInput.java => JdbcHiveInputBase.java} | 49 ++---
.../apache/kylin/source/jdbc/JdbcHiveMRInput.java | 225 +++------------------
.../kylin/source/jdbc/JdbcHiveSparkInput.java | 48 +++++
.../org/apache/kylin/source/jdbc/JdbcSource.java | 3 +
...JdbcHiveMRInput.java => JdbcHiveInputBase.java} | 18 +-
.../source/jdbc/extensible/JdbcHiveMRInput.java | 116 +++--------
.../source/jdbc/extensible/JdbcHiveSparkInput.java | 55 +++++
.../kylin/source/jdbc/extensible/JdbcSource.java | 3 +
.../jdbc/extensible/JdbcHiveMRInputTest.java | 12 +-
.../apache/kylin/source/kafka/KafkaInputBase.java | 77 ++++++-
.../apache/kylin/source/kafka/KafkaMRInput.java | 88 +-------
.../apache/kylin/source/kafka/KafkaSparkInput.java | 86 +-------
20 files changed, 413 insertions(+), 740 deletions(-)
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
similarity index 66%
copy from engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java
copy to engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
index 5459c70..758b081 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
@@ -6,44 +6,31 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
-package org.apache.kylin.engine.spark;
+package org.apache.kylin.engine.mr;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.ISegment;
-/**
- * Any ISource that wishes to serve as input of MapReduce build engine must adapt to this interface.
- */
-public interface ISparkInput {
+public interface IInput {
/** Return a helper to participate in batch cubing job flow. */
- public ISparkBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc);
+ public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc);
/** Return a helper to participate in batch cubing merge job flow. */
- public ISparkBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
-
- /**
- * Participate the batch cubing flow as the input side. Responsible for creating
- * intermediate flat table (Phase 1) and clean up any leftover (Phase 4).
- *
- * - Phase 1: Create Flat Table
- * - Phase 2: Build Dictionary (with FlatTableInputFormat)
- * - Phase 3: Build Cube (with FlatTableInputFormat)
- * - Phase 4: Update Metadata & Cleanup
- */
- public interface ISparkBatchCubingInputSide {
+ public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
+ public interface IBatchCubingInputSide {
/** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
@@ -51,7 +38,7 @@ public interface ISparkInput {
public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
}
- public interface ISparkBatchMergeInputSide {
+ public interface IBatchMergeInputSide {
/** Add step that executes before merge dictionary and before merge cube. */
public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index c259c4e..74153e0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -22,25 +22,16 @@ import java.util.Collection;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.metadata.model.TableDesc;
/**
* Any ISource that wishes to serve as input of MapReduce build engine must adapt to this interface.
*/
-public interface IMRInput {
-
- /** Return a helper to participate in batch cubing job flow. */
- public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc);
+public interface IMRInput extends IInput {
/** Return an InputFormat that reads from specified table. */
public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid);
- /** Return a helper to participate in batch cubing merge job flow. */
- public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
-
/**
* Utility that configures mapper to read from a table.
*/
@@ -65,22 +56,13 @@ public interface IMRInput {
* - Phase 3: Build Cube (with FlatTableInputFormat)
* - Phase 4: Update Metadata & Cleanup
*/
- public interface IMRBatchCubingInputSide {
+ public interface IMRBatchCubingInputSide extends IBatchCubingInputSide {
/** Return an InputFormat that reads from the intermediate flat table */
public IMRTableInputFormat getFlatTableInputFormat();
-
- /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
- public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
-
- /** Add step that does necessary clean up, like delete the intermediate flat table */
- public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
}
- public interface IMRBatchMergeInputSide {
-
- /** Add step that executes before merge dictionary and before merge cube. */
- public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+ public interface IMRBatchMergeInputSide extends IBatchMergeInputSide {
}
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 3a0fb84..60d0445 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -26,6 +26,7 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.dict.lookup.LookupProviderFactory;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchMergeInputSide;
import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
@@ -39,7 +40,7 @@ public class MRUtil {
public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg);
- return SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(flatDesc);
+ return (IMRBatchCubingInputSide)SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(flatDesc);
}
public static IMRTableInputFormat getTableInputFormat(String tableName, String prj, String uuid) {
@@ -63,8 +64,8 @@ public class MRUtil {
return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
}
- public static IMRInput.IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
- return SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg);
+ public static IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
+ return (IMRBatchMergeInputSide)SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg);
}
public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java
index 5459c70..4af616c 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java
@@ -18,20 +18,12 @@
package org.apache.kylin.engine.spark;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.engine.mr.IInput;
/**
* Any ISource that wishes to serve as input of MapReduce build engine must adapt to this interface.
*/
-public interface ISparkInput {
-
- /** Return a helper to participate in batch cubing job flow. */
- public ISparkBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc);
-
- /** Return a helper to participate in batch cubing merge job flow. */
- public ISparkBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
+public interface ISparkInput extends IInput {
/**
* Participate the batch cubing flow as the input side. Responsible for creating
@@ -42,19 +34,11 @@ public interface ISparkInput {
* - Phase 3: Build Cube (with FlatTableInputFormat)
* - Phase 4: Update Metadata & Cleanup
*/
- public interface ISparkBatchCubingInputSide {
+ public interface ISparkBatchCubingInputSide extends IBatchCubingInputSide {
- /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
- public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
-
- /** Add step that does necessary clean up, like delete the intermediate flat table */
- public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
}
- public interface ISparkBatchMergeInputSide {
-
- /** Add step that executes before merge dictionary and before merge cube. */
- public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+ public interface ISparkBatchMergeInputSide extends IBatchMergeInputSide {
}
}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index 82a1a9b..1c4086d 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -43,6 +43,8 @@ import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.kylin.engine.spark.ISparkInput.ISparkBatchCubingInputSide;
+import org.apache.kylin.engine.spark.ISparkInput.ISparkBatchMergeInputSide;
import com.google.common.collect.Lists;
import org.apache.spark.api.java.function.Function;
@@ -52,9 +54,9 @@ import org.apache.spark.sql.SparkSession;
public class SparkUtil {
- public static ISparkInput.ISparkBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
+ public static ISparkBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg);
- return SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchCubingInputSide(flatDesc);
+ return (ISparkBatchCubingInputSide)SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchCubingInputSide(flatDesc);
}
public static ISparkOutput.ISparkBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg) {
@@ -65,8 +67,8 @@ public class SparkUtil {
return StorageFactory.createEngineAdapter(seg, ISparkOutput.class).getBatchMergeOutputSide(seg);
}
- public static ISparkInput.ISparkBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
- return SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchMergeInputSide(seg);
+ public static ISparkBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
+ return (ISparkBatchMergeInputSide)SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchMergeInputSide(seg);
}
public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) {
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index c55015b..2f25e50 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -19,16 +19,22 @@
package org.apache.kylin.source.hive;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Set;
+import com.google.common.collect.Lists;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.HiveCmdBuilder;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.IInput;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.JoinedFlatTable;
@@ -48,9 +54,82 @@ import com.google.common.collect.Sets;
public class HiveInputBase {
- @SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(HiveInputBase.class);
+ public static class BaseBatchCubingInputSide implements IInput.IBatchCubingInputSide {
+
+ final protected IJoinedFlatTableDesc flatDesc;
+ final protected String flatTableDatabase;
+ final protected String hdfsWorkingDir;
+
+ List<String> hiveViewIntermediateTables = Lists.newArrayList();
+
+ public BaseBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ this.flatDesc = flatDesc;
+ this.flatTableDatabase = config.getHiveDatabaseForIntermediateTable();
+ this.hdfsWorkingDir = config.getHdfsWorkingDirectory();
+ }
+
+ @Override
+ public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
+ final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
+ CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+ final KylinConfig cubeConfig = cubeInstance.getConfig();
+
+ final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
+
+ // create flat table first
+ addStepPhase1_DoCreateFlatTable(jobFlow);
+
+ // then count and redistribute
+ if (cubeConfig.isHiveRedistributeEnabled()) {
+ jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc,
+ cubeInstance.getDescriptor()));
+ }
+
+ // special for hive
+ addStepPhase1_DoMaterializeLookupTable(jobFlow);
+ }
+
+ protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) {
+ final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
+ final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
+ final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
+
+ jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc));
+ }
+
+ protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
+ final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
+ final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
+
+ AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir,
+ flatDesc, hiveViewIntermediateTables, jobFlow.getId());
+ if (task != null) {
+ jobFlow.addTask(task);
+ }
+ }
+
+ @Override
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+ final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
+
+ org.apache.kylin.source.hive.GarbageCollectionStep step = new org.apache.kylin.source.hive.GarbageCollectionStep();
+ step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
+ step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity()));
+ step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)));
+ step.setHiveViewIntermediateTableIdentities(StringUtil.join(hiveViewIntermediateTables, ","));
+ jobFlow.addTask(step);
+ }
+
+ protected String getIntermediateTableIdentity() {
+ return flatTableDatabase + "." + flatDesc.getTableName();
+ }
+ }
+
+ // ===== static methods ======
+
protected static String getTableNameForHCat(TableDesc table, String uuid) {
String tableName = (table.isView()) ? table.getMaterializedName(uuid) : table.getName();
String database = (table.isView()) ? KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable()
@@ -58,15 +137,6 @@ public class HiveInputBase {
return String.format(Locale.ROOT, "%s.%s", database, tableName).toUpperCase(Locale.ROOT);
}
- protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow, String hdfsWorkingDir,
- IJoinedFlatTableDesc flatTableDesc, String flatTableDatabase) {
- final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
- final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
- final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
-
- jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatTableDesc));
- }
-
protected static AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir,
String cubeName, IJoinedFlatTableDesc flatDesc) {
//from hive to hive
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index d6b85ed..df20b2c 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -28,42 +28,23 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.hive.hcatalog.mapreduce.HCatSplit;
-import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.metadata.model.TableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
public class HiveMRInput extends HiveInputBase implements IMRInput {
- private static final Logger logger = LoggerFactory.getLogger(HiveMRInput.class);
-
@Override
- public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
- return new BatchCubingInputSide(flatDesc);
+ public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+ return new HiveMRBatchCubingInputSide(flatDesc);
}
@Override
- public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) {
- return new HiveTableInputFormat(getTableNameForHCat(table, uuid));
- }
-
- @Override
- public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+ public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
return new IMRBatchMergeInputSide() {
@Override
public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
@@ -72,6 +53,11 @@ public class HiveMRInput extends HiveInputBase implements IMRInput {
};
}
+ @Override
+ public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) {
+ return new HiveTableInputFormat(getTableNameForHCat(table, uuid));
+ }
+
public static class HiveTableInputFormat implements IMRTableInputFormat {
final String dbName;
final String tableName;
@@ -111,80 +97,15 @@ public class HiveMRInput extends HiveInputBase implements IMRInput {
}
}
- public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
-
- final protected IJoinedFlatTableDesc flatDesc;
- final protected String flatTableDatabase;
- final protected String hdfsWorkingDir;
-
- List<String> hiveViewIntermediateTables = Lists.newArrayList();
-
- public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
- KylinConfig config = KylinConfig.getInstanceFromEnv();
- this.flatDesc = flatDesc;
- this.flatTableDatabase = config.getHiveDatabaseForIntermediateTable();
- this.hdfsWorkingDir = config.getHdfsWorkingDirectory();
- }
-
- @Override
- public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
- final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
- CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
- final KylinConfig cubeConfig = cubeInstance.getConfig();
-
- final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-
- // create flat table first
- addStepPhase1_DoCreateFlatTable(jobFlow);
-
- // then count and redistribute
- if (cubeConfig.isHiveRedistributeEnabled()) {
- jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc,
- cubeInstance.getDescriptor()));
- }
+ public static class HiveMRBatchCubingInputSide extends BaseBatchCubingInputSide implements IMRBatchCubingInputSide {
- // special for hive
- addStepPhase1_DoMaterializeLookupTable(jobFlow);
- }
-
- protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) {
- final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
- final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
- final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
-
- jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName, flatDesc));
- }
-
- protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
- final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
- final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
-
- AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir,
- flatDesc, hiveViewIntermediateTables, jobFlow.getId());
- if (task != null) {
- jobFlow.addTask(task);
- }
- }
-
- @Override
- public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
- final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
-
- org.apache.kylin.source.hive.GarbageCollectionStep step = new org.apache.kylin.source.hive.GarbageCollectionStep();
- step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
- step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity()));
- step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)));
- step.setHiveViewIntermediateTableIdentities(StringUtil.join(hiveViewIntermediateTables, ","));
- jobFlow.addTask(step);
+ public HiveMRBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+ super(flatDesc);
}
@Override
public IMRTableInputFormat getFlatTableInputFormat() {
- return new HiveTableInputFormat(getIntermediateTableIdentity());
- }
-
- private String getIntermediateTableIdentity() {
- return flatTableDatabase + "." + flatDesc.getTableName();
+ return new HiveMRInput.HiveTableInputFormat(getIntermediateTableIdentity());
}
}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
index d710db7..0660a66 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
@@ -18,39 +18,26 @@
package org.apache.kylin.source.hive;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.engine.spark.ISparkInput;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.ISegment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-
public class HiveSparkInput extends HiveInputBase implements ISparkInput {
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(HiveSparkInput.class);
@Override
- public ISparkInput.ISparkBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
- return new BatchCubingInputSide(flatDesc);
+ public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+ return new SparkBatchCubingInputSide(flatDesc);
}
@Override
- public ISparkInput.ISparkBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
- return new ISparkInput.ISparkBatchMergeInputSide() {
+ public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+ return new ISparkBatchMergeInputSide() {
@Override
public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
// doing nothing
@@ -58,67 +45,10 @@ public class HiveSparkInput extends HiveInputBase implements ISparkInput {
};
}
- public class BatchCubingInputSide implements ISparkBatchCubingInputSide {
-
- final protected IJoinedFlatTableDesc flatDesc;
- final protected String flatTableDatabase;
- final protected String hdfsWorkingDir;
-
- List<String> hiveViewIntermediateTables = Lists.newArrayList();
-
- public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
- KylinConfig config = KylinConfig.getInstanceFromEnv();
- this.flatDesc = flatDesc;
- this.flatTableDatabase = config.getHiveDatabaseForIntermediateTable();
- this.hdfsWorkingDir = config.getHdfsWorkingDirectory();
- }
-
- @Override
- public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
- final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
- CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
- final KylinConfig cubeConfig = cubeInstance.getConfig();
- final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-
- // create flat table first
- addStepPhase1_DoCreateFlatTable(jobFlow, hdfsWorkingDir, flatDesc, flatTableDatabase);
-
- // then count and redistribute
- if (cubeConfig.isHiveRedistributeEnabled()) {
- jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc,
- cubeInstance.getDescriptor()));
- }
-
- // special for hive
- addStepPhase1_DoMaterializeLookupTable(jobFlow);
- }
-
- protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
- final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
- final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
-
- AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir,
- flatDesc, hiveViewIntermediateTables, jobFlow.getId());
- if (task != null) {
- jobFlow.addTask(task);
- }
- }
-
- @Override
- public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
- final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
+ public static class SparkBatchCubingInputSide extends BaseBatchCubingInputSide implements ISparkBatchCubingInputSide {
- GarbageCollectionStep step = new GarbageCollectionStep();
- step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
- step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity()));
- step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)));
- step.setHiveViewIntermediateTableIdentities(StringUtil.join(hiveViewIntermediateTables, ","));
- jobFlow.addTask(step);
- }
-
- private String getIntermediateTableIdentity() {
- return flatTableDatabase + "." + flatDesc.getTableName();
+ public SparkBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+ super(flatDesc);
}
}
-
}
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java
similarity index 93%
copy from source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
copy to source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java
index 3460dd2..3769473 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java
@@ -6,21 +6,18 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.source.jdbc;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
+import com.google.common.collect.Maps;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.SourceConfigurationUtil;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
@@ -34,32 +31,28 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.PartitionDesc;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.TableExtDesc;
-import org.apache.kylin.metadata.model.TableExtDesc.ColumnStats;
import org.apache.kylin.metadata.model.TableRef;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.hive.HiveMRInput;
+import org.apache.kylin.source.hive.HiveInputBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Maps;
-
-public class JdbcHiveMRInput extends HiveMRInput {
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
- private static final Logger logger = LoggerFactory.getLogger(JdbcHiveMRInput.class);
+public class JdbcHiveInputBase extends HiveInputBase {
+ private static final Logger logger = LoggerFactory.getLogger(JdbcHiveInputBase.class);
private static final String MR_OVERRIDE_QUEUE_KEY = "mapreduce.job.queuename";
private static final String DEFAULT_QUEUE = "default";
- public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
- return new BatchCubingInputSide(flatDesc);
- }
-
- public static class BatchCubingInputSide extends HiveMRInput.BatchCubingInputSide {
+ public static class JdbcBaseBatchCubingInputSide extends BaseBatchCubingInputSide {
- public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+ public JdbcBaseBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
super(flatDesc);
}
- private KylinConfig getConfig() {
+ protected KylinConfig getConfig() {
return flatDesc.getDataModel().getConfig();
}
@@ -73,6 +66,11 @@ public class JdbcHiveMRInput extends HiveMRInput {
jobFlow.addTask(createFlatHiveTableFromFiles(hiveInitStatements, jobWorkingDir));
}
+ @Override
+ protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
+ // skip
+ }
+
private AbstractExecutable createFlatHiveTableFromFiles(String hiveInitStatements, String jobWorkingDir) {
final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
String filedDelimiter = getConfig().getJdbcSourceFieldDelimiter();
@@ -112,7 +110,7 @@ public class JdbcHiveMRInput extends HiveMRInput {
long maxCardinality = 0;
for (TableRef tableRef : flatDesc.getDataModel().getAllTables()) {
TableExtDesc tableExtDesc = tblManager.getTableExt(tableRef.getTableDesc());
- List<ColumnStats> columnStatses = tableExtDesc.getColumnStats();
+ List<TableExtDesc.ColumnStats> columnStatses = tableExtDesc.getColumnStats();
if (!columnStatses.isEmpty()) {
for (TblColRef colRef : tableRef.getColumns()) {
long cardinality = columnStatses.get(colRef.getColumnDesc().getZeroBasedIndex())
@@ -175,14 +173,14 @@ public class JdbcHiveMRInput extends HiveMRInput {
String filedDelimiter = config.getJdbcSourceFieldDelimiter();
int mapperNum = config.getSqoopMapperNum();
- String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM \"%s\".%s as %s", splitColumn,
+ String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM %s.%s as %s", splitColumn,
splitColumn, splitDatabase, splitTable, splitTableAlias);
if (partitionDesc.isPartitioned()) {
SegmentRange segRange = flatDesc.getSegRange();
if (segRange != null && !segRange.isInfinite()) {
if (partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
&& (partitionDesc.getPartitionTimeColumnRef() == null || partitionDesc
- .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
+ .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
String quotedPartCond = FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
flatDesc.getSegment(), segRange),
@@ -210,11 +208,6 @@ public class JdbcHiveMRInput extends HiveMRInput {
return step;
}
- @Override
- protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
- // skip
- }
-
protected String generateSqoopConfigArgString() {
KylinConfig kylinConfig = getConfig();
Map<String, String> config = Maps.newHashMap();
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
index 3460dd2..19f354c 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
@@ -17,222 +17,45 @@
*/
package org.apache.kylin.source.jdbc;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.SourceConfigurationUtil;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.util.FlatTableSqlQuoteUtils;
-import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.metadata.model.SegmentRange;
-import org.apache.kylin.metadata.model.TableExtDesc;
-import org.apache.kylin.metadata.model.TableExtDesc.ColumnStats;
-import org.apache.kylin.metadata.model.TableRef;
-import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.source.hive.HiveMRInput;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Maps;
-
-public class JdbcHiveMRInput extends HiveMRInput {
- private static final Logger logger = LoggerFactory.getLogger(JdbcHiveMRInput.class);
- private static final String MR_OVERRIDE_QUEUE_KEY = "mapreduce.job.queuename";
- private static final String DEFAULT_QUEUE = "default";
+public class JdbcHiveMRInput extends JdbcHiveInputBase implements IMRInput {
- public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
- return new BatchCubingInputSide(flatDesc);
+ @Override
+ public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+ return new JdbcMRBatchCubingInputSide(flatDesc);
}
- public static class BatchCubingInputSide extends HiveMRInput.BatchCubingInputSide {
-
- public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
- super(flatDesc);
- }
-
- private KylinConfig getConfig() {
- return flatDesc.getDataModel().getConfig();
- }
-
- @Override
- protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) {
- final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
- final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
- final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
-
- jobFlow.addTask(createSqoopToFlatHiveStep(jobWorkingDir, cubeName));
- jobFlow.addTask(createFlatHiveTableFromFiles(hiveInitStatements, jobWorkingDir));
- }
-
- private AbstractExecutable createFlatHiveTableFromFiles(String hiveInitStatements, String jobWorkingDir) {
- final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
- String filedDelimiter = getConfig().getJdbcSourceFieldDelimiter();
- // Sqoop does not support exporting SEQUENSEFILE to Hive now SQOOP-869
- final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir,
- "TEXTFILE", filedDelimiter);
-
- HiveCmdStep step = new HiveCmdStep();
- step.setCmd(hiveInitStatements + dropTableHql + createTableHql);
- step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
- return step;
- }
-
- /**
- * Choose a better split-by column for sqoop. The strategy is:
- * 1. Prefer ClusteredBy column
- * 2. Prefer DistributedBy column
- * 3. Prefer Partition date column
- * 4. Prefer Higher cardinality column
- * 5. Prefer numeric column
- * 6. Pick a column at first glance
- * @return A column reference <code>TblColRef</code>for sqoop split-by
- */
- protected TblColRef determineSplitColumn() {
- if (null != flatDesc.getClusterBy()) {
- return flatDesc.getClusterBy();
- }
- if (null != flatDesc.getDistributedBy()) {
- return flatDesc.getDistributedBy();
- }
- PartitionDesc partitionDesc = flatDesc.getDataModel().getPartitionDesc();
- if (partitionDesc.isPartitioned()) {
- return partitionDesc.getPartitionDateColumnRef();
- }
- TblColRef splitColumn = null;
- TableMetadataManager tblManager = TableMetadataManager.getInstance(getConfig());
- long maxCardinality = 0;
- for (TableRef tableRef : flatDesc.getDataModel().getAllTables()) {
- TableExtDesc tableExtDesc = tblManager.getTableExt(tableRef.getTableDesc());
- List<ColumnStats> columnStatses = tableExtDesc.getColumnStats();
- if (!columnStatses.isEmpty()) {
- for (TblColRef colRef : tableRef.getColumns()) {
- long cardinality = columnStatses.get(colRef.getColumnDesc().getZeroBasedIndex())
- .getCardinality();
- splitColumn = cardinality > maxCardinality ? colRef : splitColumn;
- }
- }
- }
- if (null == splitColumn) {
- for (TblColRef colRef : flatDesc.getAllColumns()) {
- if (colRef.getType().isIntegerFamily()) {
- return colRef;
- }
- }
- splitColumn = flatDesc.getAllColumns().get(0);
- }
-
- return splitColumn;
- }
-
- private String getSqoopJobQueueName(KylinConfig config) {
- Map<String, String> mrConfigOverride = config.getMRConfigOverride();
- if (mrConfigOverride.containsKey(MR_OVERRIDE_QUEUE_KEY)) {
- return mrConfigOverride.get(MR_OVERRIDE_QUEUE_KEY);
+ @Override
+ public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+ return new IMRBatchMergeInputSide() {
+ @Override
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+ // doing nothing
}
- return DEFAULT_QUEUE;
- }
-
- protected AbstractExecutable createSqoopToFlatHiveStep(String jobWorkingDir, String cubeName) {
- KylinConfig config = getConfig();
- PartitionDesc partitionDesc = flatDesc.getDataModel().getPartitionDesc();
- String partCol = null;
-
- if (partitionDesc.isPartitioned()) {
- partCol = partitionDesc.getPartitionDateColumn();//tablename.colname
- }
-
- String splitTable;
- String splitTableAlias;
- String splitColumn;
- String splitDatabase;
- TblColRef splitColRef = determineSplitColumn();
- splitTable = splitColRef.getTableRef().getTableName();
- splitTableAlias = splitColRef.getTableAlias();
- splitColumn = JoinedFlatTable.getQuotedColExpressionInSourceDB(flatDesc, splitColRef);
- splitDatabase = splitColRef.getColumnDesc().getTable().getDatabase();
-
- //using sqoop to extract data from jdbc source and dump them to hive
- String selectSql = JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[] { partCol });
- selectSql = escapeQuotationInSql(selectSql);
-
-
-
- String hiveTable = flatDesc.getTableName();
- String connectionUrl = config.getJdbcSourceConnectionUrl();
- String driverClass = config.getJdbcSourceDriver();
- String jdbcUser = config.getJdbcSourceUser();
- String jdbcPass = config.getJdbcSourcePass();
- String sqoopHome = config.getSqoopHome();
- String filedDelimiter = config.getJdbcSourceFieldDelimiter();
- int mapperNum = config.getSqoopMapperNum();
+ };
+ }
- String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM \"%s\".%s as %s", splitColumn,
- splitColumn, splitDatabase, splitTable, splitTableAlias);
- if (partitionDesc.isPartitioned()) {
- SegmentRange segRange = flatDesc.getSegRange();
- if (segRange != null && !segRange.isInfinite()) {
- if (partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
- && (partitionDesc.getPartitionTimeColumnRef() == null || partitionDesc
- .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
- String quotedPartCond = FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
- partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
- flatDesc.getSegment(), segRange),
- "`");
- bquery += " WHERE " + quotedPartCond;
- }
- }
- }
- bquery = escapeQuotationInSql(bquery);
+ @Override
+ public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) {
+ return new HiveMRInput.HiveTableInputFormat(getTableNameForHCat(table, uuid));
+ }
- // escape ` in cmd
- splitColumn = escapeQuotationInSql(splitColumn);
+ public static class JdbcMRBatchCubingInputSide extends JdbcBaseBatchCubingInputSide implements IMRBatchCubingInputSide {
- String cmd = String.format(Locale.ROOT,
- "%s/bin/sqoop import" + generateSqoopConfigArgString()
- + "--connect \"%s\" --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" "
- + "--target-dir %s/%s --split-by %s --boundary-query \"%s\" --null-string '' "
- + "--fields-terminated-by '%s' --num-mappers %d",
- sqoopHome, connectionUrl, driverClass, jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable,
- splitColumn, bquery, filedDelimiter, mapperNum);
- logger.debug(String.format(Locale.ROOT, "sqoop cmd:%s", cmd));
- CmdStep step = new CmdStep();
- step.setCmd(cmd);
- step.setName(ExecutableConstants.STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE);
- return step;
+ public JdbcMRBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+ super(flatDesc);
}
@Override
- protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
- // skip
- }
-
- protected String generateSqoopConfigArgString() {
- KylinConfig kylinConfig = getConfig();
- Map<String, String> config = Maps.newHashMap();
- config.put("mapreduce.job.queuename", getSqoopJobQueueName(kylinConfig)); // override job queue from mapreduce config
- config.putAll(SourceConfigurationUtil.loadSqoopConfiguration());
- config.putAll(kylinConfig.getSqoopConfigOverride());
-
- StringBuilder args = new StringBuilder(" -Dorg.apache.sqoop.splitter.allow_text_splitter=true ");
- for (Map.Entry<String, String> entry : config.entrySet()) {
- args.append(" -D" + entry.getKey() + "=" + entry.getValue() + " ");
- }
- return args.toString();
+ public IMRTableInputFormat getFlatTableInputFormat() {
+ return new HiveMRInput.HiveTableInputFormat(getIntermediateTableIdentity());
}
}
- protected static String escapeQuotationInSql(String sqlExpr) {
- sqlExpr = sqlExpr.replaceAll("\"", "\\\\\"");
- sqlExpr = sqlExpr.replaceAll("`", "\\\\`");
- return sqlExpr;
- }
}
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveSparkInput.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveSparkInput.java
new file mode 100644
index 0000000..8a8471a
--- /dev/null
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveSparkInput.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.jdbc;
+
+import org.apache.kylin.engine.spark.ISparkInput;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
+
+public class JdbcHiveSparkInput extends JdbcHiveInputBase implements ISparkInput {
+
+ @Override
+ public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+ return new JdbcSparkBatchCubingInputSide(flatDesc);
+ }
+
+ @Override
+ public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+ return new ISparkBatchMergeInputSide() {
+ @Override
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+ // doing nothing
+ }
+ };
+ }
+
+ public static class JdbcSparkBatchCubingInputSide extends JdbcBaseBatchCubingInputSide implements ISparkBatchCubingInputSide {
+
+ public JdbcSparkBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+ super(flatDesc);
+ }
+ }
+}
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
index 3bf7498..1bda6c2 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.spark.ISparkInput;
import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.source.IReadableTable;
@@ -44,6 +45,8 @@ public class JdbcSource implements ISource {
public <I> I adaptToBuildEngine(Class<I> engineInterface) {
if (engineInterface == IMRInput.class) {
return (I) new JdbcHiveMRInput();
+ } else if (engineInterface == ISparkInput.class) {
+ return (I) new JdbcHiveSparkInput();
} else {
throw new RuntimeException("Cannot adapt to " + engineInterface);
}
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
similarity index 90%
copy from source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
copy to source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
index 2e57a44..10eb31e 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.kylin.source.jdbc.extensible;
import org.apache.hadoop.util.StringUtils;
@@ -34,23 +35,14 @@ import org.slf4j.LoggerFactory;
import java.util.Locale;
-public class JdbcHiveMRInput extends org.apache.kylin.source.jdbc.JdbcHiveMRInput {
- private static final Logger logger = LoggerFactory.getLogger(JdbcHiveMRInput.class);
-
- private final JdbcConnector dataSource;
+public class JdbcHiveInputBase extends org.apache.kylin.source.jdbc.JdbcHiveInputBase {
- JdbcHiveMRInput(JdbcConnector dataSource) {
- this.dataSource = dataSource;
- }
-
- public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
- return new BatchCubingInputSide(flatDesc, dataSource);
- }
+ private static final Logger logger = LoggerFactory.getLogger(JdbcHiveInputBase.class);
- public static class BatchCubingInputSide extends org.apache.kylin.source.jdbc.JdbcHiveMRInput.BatchCubingInputSide {
+ public static class JDBCBaseBatchCubingInputSide extends JdbcBaseBatchCubingInputSide {
private final JdbcConnector dataSource;
- public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc, JdbcConnector dataSource) {
+ public JDBCBaseBatchCubingInputSide(IJoinedFlatTableDesc flatDesc, JdbcConnector dataSource) {
super(flatDesc);
this.dataSource = dataSource;
}
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
index 2e57a44..7df4ab5 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
@@ -17,25 +17,15 @@
*/
package org.apache.kylin.source.jdbc.extensible;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.util.FlatTableSqlQuoteUtils;
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.metadata.model.SegmentRange;
-import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.sdk.datasource.framework.JdbcConnector;
-import org.apache.kylin.source.jdbc.sqoop.SqoopCmdStep;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.kylin.source.hive.HiveMRInput;
-import java.util.Locale;
-
-public class JdbcHiveMRInput extends org.apache.kylin.source.jdbc.JdbcHiveMRInput {
- private static final Logger logger = LoggerFactory.getLogger(JdbcHiveMRInput.class);
+public class JdbcHiveMRInput extends JdbcHiveInputBase implements IMRInput {
private final JdbcConnector dataSource;
@@ -44,86 +34,34 @@ public class JdbcHiveMRInput extends org.apache.kylin.source.jdbc.JdbcHiveMRInpu
}
public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
- return new BatchCubingInputSide(flatDesc, dataSource);
+ return new JdbcMRBatchCubingInputSide(flatDesc, dataSource);
}
- public static class BatchCubingInputSide extends org.apache.kylin.source.jdbc.JdbcHiveMRInput.BatchCubingInputSide {
- private final JdbcConnector dataSource;
-
- public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc, JdbcConnector dataSource) {
- super(flatDesc);
- this.dataSource = dataSource;
- }
-
- protected JdbcConnector getDataSource() {
- return dataSource;
- }
-
- @Override
- protected AbstractExecutable createSqoopToFlatHiveStep(String jobWorkingDir, String cubeName) {
- KylinConfig config = flatDesc.getDataModel().getConfig();
- PartitionDesc partitionDesc = flatDesc.getDataModel().getPartitionDesc();
- String partCol = null;
-
- if (partitionDesc.isPartitioned()) {
- partCol = partitionDesc.getPartitionDateColumn(); //tablename.colname
+ @Override
+ public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+ return new IMRBatchMergeInputSide() {
+ @Override
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+ // doing nothing
}
+ };
+ }
- String splitTable;
- String splitTableAlias;
- String splitColumn;
- String splitDatabase;
- TblColRef splitColRef = determineSplitColumn();
- splitTable = splitColRef.getTableRef().getTableName();
- splitTable = splitColRef.getTableRef().getTableDesc().getName();
- splitTableAlias = splitColRef.getTableAlias();
- //to solve case sensitive if necessary
- splitColumn = JoinedFlatTable.getQuotedColExpressionInSourceDB(flatDesc, splitColRef);
- splitDatabase = splitColRef.getColumnDesc().getTable().getDatabase().toLowerCase(Locale.ROOT);
-
- //using sqoop to extract data from jdbc source and dump them to hive
- String selectSql = JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[] { partCol });
- selectSql = escapeQuotationInSql(dataSource.convertSql(selectSql));
-
- String hiveTable = flatDesc.getTableName();
- String sqoopHome = config.getSqoopHome();
- String filedDelimiter = config.getJdbcSourceFieldDelimiter();
- int mapperNum = config.getSqoopMapperNum();
-
- String bquery = String.format(Locale.ROOT, "SELECT min(%s), max(%s) FROM `%s`.%s as `%s`", splitColumn, splitColumn,
- splitDatabase, splitTable, splitTableAlias);
- bquery = dataSource.convertSql(bquery);
- if (partitionDesc.isPartitioned()) {
- SegmentRange segRange = flatDesc.getSegRange();
- if (segRange != null && !segRange.isInfinite()) {
- if (partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
- && (partitionDesc.getPartitionTimeColumnRef() == null || partitionDesc
- .getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
- String quotedPartCond = FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
- partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
- flatDesc.getSegment(), segRange),
- "`");
- bquery += " WHERE " + quotedPartCond;
- }
- }
- }
- bquery = escapeQuotationInSql(bquery);
+ @Override
+ public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) {
+ return new HiveMRInput.HiveTableInputFormat(getTableNameForHCat(table, uuid));
+ }
- splitColumn = escapeQuotationInSql(dataSource.convertColumn(splitColumn, FlatTableSqlQuoteUtils.QUOTE));
+ public static class JdbcMRBatchCubingInputSide extends JDBCBaseBatchCubingInputSide implements IMRInput.IMRBatchCubingInputSide {
- String cmd = StringUtils.format(
- "--connect \"%s\" --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" "
- + "--target-dir %s/%s --split-by %s --boundary-query \"%s\" --null-string '' "
- + "--fields-terminated-by '%s' --num-mappers %d",
- dataSource.getJdbcUrl(), dataSource.getJdbcDriver(), dataSource.getJdbcUser(),
- dataSource.getJdbcPassword(), selectSql, jobWorkingDir, hiveTable, splitColumn, bquery,
- filedDelimiter, mapperNum);
- logger.debug("sqoop cmd: {}", cmd);
+ public JdbcMRBatchCubingInputSide(IJoinedFlatTableDesc flatDesc, JdbcConnector dataSource) {
+ super(flatDesc, dataSource);
+ }
- SqoopCmdStep step = new SqoopCmdStep();
- step.setCmd(cmd);
- step.setName(ExecutableConstants.STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE);
- return step;
+ @Override
+ public IMRInput.IMRTableInputFormat getFlatTableInputFormat() {
+ return new HiveMRInput.HiveTableInputFormat(getIntermediateTableIdentity());
}
}
+
}
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveSparkInput.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveSparkInput.java
new file mode 100644
index 0000000..a5701ad
--- /dev/null
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveSparkInput.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.jdbc.extensible;
+
+import org.apache.kylin.engine.spark.ISparkInput;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.sdk.datasource.framework.JdbcConnector;
+
+public class JdbcHiveSparkInput extends JdbcHiveInputBase implements ISparkInput {
+
+ private final JdbcConnector dataSource;
+
+ JdbcHiveSparkInput(JdbcConnector dataSource) {
+ this.dataSource = dataSource;
+ }
+
+ @Override
+ public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+ return new JdbcSparkBatchCubingInputSide(flatDesc, dataSource);
+ }
+
+ @Override
+ public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+ return new ISparkBatchMergeInputSide() {
+ @Override
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+ // doing nothing
+ }
+ };
+ }
+
+ public static class JdbcSparkBatchCubingInputSide extends JDBCBaseBatchCubingInputSide implements ISparkBatchCubingInputSide {
+
+ public JdbcSparkBatchCubingInputSide(IJoinedFlatTableDesc flatDesc, JdbcConnector dataSource) {
+ super(flatDesc, dataSource);
+ }
+ }
+}
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcSource.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcSource.java
index 3e8f0fd..da055e1 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcSource.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcSource.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.spark.ISparkInput;
import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.sdk.datasource.framework.JdbcConnector;
@@ -60,6 +61,8 @@ public class JdbcSource implements ISource {
public <I> I adaptToBuildEngine(Class<I> engineInterface) {
if (engineInterface == IMRInput.class) {
return (I) new JdbcHiveMRInput(dataSource);
+ } else if (engineInterface == ISparkInput.class) {
+ return (I) new JdbcHiveSparkInput(dataSource);
} else {
throw new RuntimeException("Cannot adapt to " + engineInterface);
}
diff --git a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java
index 956f86c..20c37ef 100644
--- a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java
+++ b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java
@@ -60,7 +60,7 @@ public class JdbcHiveMRInputTest extends TestBase {
CubeSegment seg = cubeManager.appendSegment(cubeManager.getCube(cubeDesc.getName()),
new SegmentRange.TSRange(System.currentTimeMillis() - 100L, System.currentTimeMillis() + 100L));
CubeJoinedFlatTableDesc flatDesc = new CubeJoinedFlatTableDesc(seg);
- JdbcHiveMRInput.BatchCubingInputSide inputSide = (JdbcHiveMRInput.BatchCubingInputSide) input
+ JdbcHiveMRInput.JdbcMRBatchCubingInputSide inputSide = (JdbcHiveMRInput.JdbcMRBatchCubingInputSide) input
.getBatchCubingInputSide(flatDesc);
AbstractExecutable executable = new MockInputSide(flatDesc, inputSide).createSqoopToFlatHiveStep("/tmp",
@@ -86,7 +86,7 @@ public class JdbcHiveMRInputTest extends TestBase {
CubeSegment seg = cubeManager.appendSegment(cubeManager.getCube(cubeDesc.getName()),
new SegmentRange.TSRange(0L, Long.MAX_VALUE));
CubeJoinedFlatTableDesc flatDesc = new CubeJoinedFlatTableDesc(seg);
- JdbcHiveMRInput.BatchCubingInputSide inputSide = (JdbcHiveMRInput.BatchCubingInputSide) input
+ JdbcHiveMRInput.JdbcMRBatchCubingInputSide inputSide = (JdbcHiveMRInput.JdbcMRBatchCubingInputSide) input
.getBatchCubingInputSide(flatDesc);
AbstractExecutable executable = new MockInputSide(flatDesc, inputSide).createSqoopToFlatHiveStep("/tmp",
@@ -111,7 +111,7 @@ public class JdbcHiveMRInputTest extends TestBase {
CubeSegment seg = cubeManager.appendSegment(cubeManager.getCube(cubeDesc.getName()),
new SegmentRange.TSRange(System.currentTimeMillis() - 100L, System.currentTimeMillis() + 100L));
CubeJoinedFlatTableDesc flatDesc = new CubeJoinedFlatTableDesc(seg);
- JdbcHiveMRInput.BatchCubingInputSide inputSide = (JdbcHiveMRInput.BatchCubingInputSide) input
+ JdbcHiveMRInput.JdbcMRBatchCubingInputSide inputSide = (JdbcHiveMRInput.JdbcMRBatchCubingInputSide) input
.getBatchCubingInputSide(flatDesc);
AbstractExecutable executable = new MockInputSide(flatDesc, inputSide).createSqoopToFlatHiveStep("/tmp",
@@ -127,10 +127,10 @@ public class JdbcHiveMRInputTest extends TestBase {
source.close();
}
- private static class MockInputSide extends JdbcHiveMRInput.BatchCubingInputSide {
- JdbcHiveMRInput.BatchCubingInputSide input;
+ private static class MockInputSide extends JdbcHiveMRInput.JdbcMRBatchCubingInputSide {
+ JdbcHiveMRInput.JdbcMRBatchCubingInputSide input;
- public MockInputSide(IJoinedFlatTableDesc flatDesc, JdbcHiveMRInput.BatchCubingInputSide input) {
+ public MockInputSide(IJoinedFlatTableDesc flatDesc, JdbcHiveMRInput.JdbcMRBatchCubingInputSide input) {
super(flatDesc, input.getDataSource());
this.input = input;
}
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
index cb2e14c..7620ab3 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
@@ -20,18 +20,23 @@ package org.apache.kylin.source.kafka;
import java.util.LinkedList;
import java.util.List;
+import java.util.Locale;
import java.util.Set;
-import com.google.common.collect.Sets;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.IInput;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.ISegment;
@@ -45,8 +50,78 @@ import org.apache.kylin.source.hive.GarbageCollectionStep;
import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob;
import org.apache.kylin.source.kafka.job.MergeOffsetStep;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
public class KafkaInputBase {
+ public static class BaseBatchCubingInputSide implements IInput.IBatchCubingInputSide {
+
+ final JobEngineConfig conf;
+ final CubeSegment seg;
+ private CubeDesc cubeDesc;
+ private KylinConfig config;
+ protected IJoinedFlatTableDesc flatDesc;
+ protected String hiveTableDatabase;
+ final private List<String> intermediateTables = Lists.newArrayList();
+ final private List<String> intermediatePaths = Lists.newArrayList();
+ private String cubeName;
+
+ public BaseBatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
+ this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
+ this.config = seg.getConfig();
+ this.flatDesc = flatDesc;
+ this.hiveTableDatabase = config.getHiveDatabaseForIntermediateTable();
+ this.seg = seg;
+ this.cubeDesc = seg.getCubeDesc();
+ this.cubeName = seg.getCubeInstance().getName();
+ }
+
+ @Override
+ public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
+
+ boolean onlyOneTable = cubeDesc.getModel().getLookupTables().size() == 0;
+ final String baseLocation = getJobWorkingDir(jobFlow);
+ if (onlyOneTable) {
+ // directly use flat table location
+ final String intermediateFactTable = flatDesc.getTableName();
+ final String tableLocation = baseLocation + "/" + intermediateFactTable;
+ jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), tableLocation, seg));
+ intermediatePaths.add(tableLocation);
+ } else {
+ final String mockFactTableName = MetadataConstants.KYLIN_INTERMEDIATE_PREFIX
+ + cubeName.toLowerCase(Locale.ROOT) + "_" + seg.getUuid().replaceAll("-", "_") + "_fact";
+ jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + mockFactTableName, seg));
+ jobFlow.addTask(createFlatTable(hiveTableDatabase, mockFactTableName, baseLocation, cubeName, cubeDesc,
+ flatDesc, intermediateTables, intermediatePaths));
+ }
+ }
+
+ protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
+ return JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobFlow.getId());
+ }
+
+ @Override
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(createGCStep(intermediateTables, intermediatePaths));
+
+ }
+ }
+
+ public static class BaseBatchMergeInputSide implements IInput.IBatchMergeInputSide {
+
+ private CubeSegment cubeSegment;
+
+ BaseBatchMergeInputSide(CubeSegment cubeSegment) {
+ this.cubeSegment = cubeSegment;
+ }
+
+ @Override
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(createMergeOffsetStep(jobFlow.getId(), cubeSegment));
+ }
+ }
+
protected static AbstractExecutable createMergeOffsetStep(String jobId, CubeSegment cubeSegment) {
final MergeOffsetStep result = new MergeOffsetStep();
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 1c94f9c..d709572 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -20,8 +20,6 @@ package org.apache.kylin.source.kafka;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -30,46 +28,36 @@ import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.metadata.model.TableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
public class KafkaMRInput extends KafkaInputBase implements IMRInput {
- private static final Logger logger = LoggerFactory.getLogger(KafkaMRInput.class);
private CubeSegment cubeSegment;
@Override
- public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+ public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
this.cubeSegment = (CubeSegment) flatDesc.getSegment();
- return new BatchCubingInputSide(cubeSegment, flatDesc);
+ return new KafkaMRBatchCubingInputSide(cubeSegment, flatDesc);
}
@Override
- public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) {
-
- return new KafkaTableInputFormat(cubeSegment, null);
+ public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+ return new KafkaMRBatchMergeInputSide((CubeSegment)seg);
}
@Override
- public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
- return new KafkaMRBatchMergeInputSide((CubeSegment) seg);
+ public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) {
+ return new KafkaTableInputFormat(cubeSegment, null);
}
public static class KafkaTableInputFormat implements IMRTableInputFormat {
@@ -110,56 +98,10 @@ public class KafkaMRInput extends KafkaInputBase implements IMRInput {
}
}
- public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
-
- final JobEngineConfig conf;
- final CubeSegment seg;
- private CubeDesc cubeDesc;
- private KylinConfig config;
- protected IJoinedFlatTableDesc flatDesc;
- protected String hiveTableDatabase;
- private List<String> intermediateTables = Lists.newArrayList();
- private List<String> intermediatePaths = Lists.newArrayList();
- private String cubeName;
-
- public BatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
- this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
- this.config = seg.getConfig();
- this.flatDesc = flatDesc;
- this.hiveTableDatabase = config.getHiveDatabaseForIntermediateTable();
- this.seg = seg;
- this.cubeDesc = seg.getCubeDesc();
- this.cubeName = seg.getCubeInstance().getName();
- }
-
- @Override
- public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
-
- boolean onlyOneTable = cubeDesc.getModel().getLookupTables().size() == 0;
- final String baseLocation = getJobWorkingDir(jobFlow);
- if (onlyOneTable) {
- // directly use flat table location
- final String intermediateFactTable = flatDesc.getTableName();
- final String tableLocation = baseLocation + "/" + intermediateFactTable;
- jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), tableLocation, seg));
- intermediatePaths.add(tableLocation);
- } else {
- final String mockFactTableName = MetadataConstants.KYLIN_INTERMEDIATE_PREFIX
- + cubeName.toLowerCase(Locale.ROOT) + "_" + seg.getUuid().replaceAll("-", "_") + "_fact";
- jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + mockFactTableName, seg));
- jobFlow.addTask(createFlatTable(hiveTableDatabase, mockFactTableName, baseLocation, cubeName, cubeDesc,
- flatDesc, intermediateTables, intermediatePaths));
- }
- }
-
- protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
- return JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobFlow.getId());
- }
-
- @Override
- public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
- jobFlow.addTask(createGCStep(intermediateTables, intermediatePaths));
+ public static class KafkaMRBatchCubingInputSide extends BaseBatchCubingInputSide implements IMRBatchCubingInputSide {
+ public KafkaMRBatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
+ super(seg, flatDesc);
}
@Override
@@ -168,18 +110,10 @@ public class KafkaMRInput extends KafkaInputBase implements IMRInput {
}
}
- class KafkaMRBatchMergeInputSide implements IMRBatchMergeInputSide {
-
- private CubeSegment cubeSegment;
+ public static class KafkaMRBatchMergeInputSide extends BaseBatchMergeInputSide implements IMRBatchMergeInputSide {
KafkaMRBatchMergeInputSide(CubeSegment cubeSegment) {
- this.cubeSegment = cubeSegment;
- }
-
- @Override
- public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
- jobFlow.addTask(createMergeOffsetStep(jobFlow.getId(), cubeSegment));
+ super(cubeSegment);
}
}
-
}
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java
index 7db6c32..edbc002 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java
@@ -17,105 +17,37 @@
*/
package org.apache.kylin.source.kafka;
-import java.util.List;
-import java.util.Locale;
-
-import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.spark.ISparkInput;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.ISegment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
public class KafkaSparkInput extends KafkaInputBase implements ISparkInput {
- private static final Logger logger = LoggerFactory.getLogger(KafkaSparkInput.class);
private CubeSegment cubeSegment;
@Override
- public ISparkInput.ISparkBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+ public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
this.cubeSegment = (CubeSegment) flatDesc.getSegment();
- return new BatchCubingInputSide(cubeSegment, flatDesc);
+ return new KafkaSparkBatchCubingInputSide(cubeSegment, flatDesc);
}
@Override
- public ISparkBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
- return new KafkaSparkBatchMergeInputSide((CubeSegment) seg);
+ public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+ return new KafkaSparkBatchMergeInputSide((CubeSegment)seg);
}
- public static class BatchCubingInputSide implements ISparkBatchCubingInputSide {
-
- final JobEngineConfig conf;
- final CubeSegment seg;
- private CubeDesc cubeDesc;
- private KylinConfig config;
- protected IJoinedFlatTableDesc flatDesc;
- protected String hiveTableDatabase;
- final private List<String> intermediateTables = Lists.newArrayList();
- final private List<String> intermediatePaths = Lists.newArrayList();
- private String cubeName;
-
- public BatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
- this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
- this.config = seg.getConfig();
- this.flatDesc = flatDesc;
- this.hiveTableDatabase = config.getHiveDatabaseForIntermediateTable();
- this.seg = seg;
- this.cubeDesc = seg.getCubeDesc();
- this.cubeName = seg.getCubeInstance().getName();
- }
-
- @Override
- public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
-
- boolean onlyOneTable = cubeDesc.getModel().getLookupTables().size() == 0;
- final String baseLocation = getJobWorkingDir(jobFlow);
- if (onlyOneTable) {
- // directly use flat table location
- final String intermediateFactTable = flatDesc.getTableName();
- final String tableLocation = baseLocation + "/" + intermediateFactTable;
- jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), tableLocation, seg));
- intermediatePaths.add(tableLocation);
- } else {
- final String mockFactTableName = MetadataConstants.KYLIN_INTERMEDIATE_PREFIX
- + cubeName.toLowerCase(Locale.ROOT) + "_" + seg.getUuid().replaceAll("-", "_") + "_fact";
- jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + mockFactTableName, seg));
- jobFlow.addTask(createFlatTable(hiveTableDatabase, mockFactTableName, baseLocation, cubeName, cubeDesc,
- flatDesc, intermediateTables, intermediatePaths));
- }
- }
-
- protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
- return JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobFlow.getId());
- }
-
- @Override
- public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
- jobFlow.addTask(createGCStep(intermediateTables, intermediatePaths));
+ public static class KafkaSparkBatchCubingInputSide extends BaseBatchCubingInputSide implements ISparkBatchCubingInputSide {
+ public KafkaSparkBatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) {
+ super(seg, flatDesc);
}
}
- class KafkaSparkBatchMergeInputSide implements ISparkBatchMergeInputSide {
-
- private CubeSegment cubeSegment;
+ public static class KafkaSparkBatchMergeInputSide extends BaseBatchMergeInputSide implements ISparkBatchMergeInputSide {
KafkaSparkBatchMergeInputSide(CubeSegment cubeSegment) {
- this.cubeSegment = cubeSegment;
- }
-
- @Override
- public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
- jobFlow.addTask(createMergeOffsetStep(jobFlow.getId(), cubeSegment));
+ super(cubeSegment);
}
}
-
}