You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/11 08:19:04 UTC
[kylin] 02/02: KYLIN-3680 Spark cubing failed with JDBC data source
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit cddb493884e624c12c3452bad4de7482fc37de8d
Author: chao long <wa...@qq.com>
AuthorDate: Mon Dec 10 13:05:27 2018 +0800
KYLIN-3680 Spark cubing failed with JDBC data source
---
.../org/apache/kylin/common/util/HadoopUtil.java | 24 +++++++++
.../org/apache/kylin/engine/spark/SparkUtil.java | 63 ++++++++++++----------
2 files changed, 60 insertions(+), 27 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index f3123a2..5d09ea7 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -236,4 +236,28 @@ public class HadoopUtil {
return readFromSequenceFile(getCurrentConfiguration(), inputPath);
}
+ public static boolean isSequenceFile(Configuration conf, Path filePath) {
+ try (SequenceFile.Reader reader = new SequenceFile.Reader(getWorkingFileSystem(conf), filePath, conf)) {
+ return true;
+ } catch (Exception e) {
+ logger.warn("Read sequence file {} failed.", filePath.getName(), e);
+ return false;
+ }
+ }
+
+ public static boolean isSequenceDir(Configuration conf, Path fileDir) throws IOException {
+ FileSystem fs = getWorkingFileSystem(conf);
+ FileStatus[] fileStatuses = fs.listStatus(fileDir, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return !"_SUCCESS".equals(path.getName());
+ }
+ });
+
+ if (fileStatuses != null && fileStatuses.length > 0) {
+ return isSequenceFile(conf, fileStatuses[0].getPath());
+ }
+
+ return false;
+ }
}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index 1c4086d..151103a 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.EngineFactory;
@@ -142,39 +143,47 @@ public class SparkUtil {
sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.DefaultCodec"); // or org.apache.hadoop.io.compress.SnappyCodec
}
- public static JavaRDD<String[]> hiveRecordInputRDD(boolean isSequenceFile, JavaSparkContext sc, String inputPath, String hiveTable) {
+ public static JavaRDD<String[]> hiveRecordInputRDD(boolean isSequenceFile, JavaSparkContext sc, String inputPath, String hiveTable) throws IOException {
JavaRDD<String[]> recordRDD;
- if (isSequenceFile) {
- recordRDD = sc.sequenceFile(inputPath, BytesWritable.class, Text.class).values()
- .map(new Function<Text, String[]>() {
- @Override
- public String[] call(Text text) throws Exception {
- String s = Bytes.toString(text.getBytes(), 0, text.getLength());
- return s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
- }
- });
+ if (isSequenceFile && HadoopUtil.isSequenceDir(sc.hadoopConfiguration(), new Path(inputPath))) {
+ recordRDD = getSequenceFormatHiveInput(sc, inputPath);
} else {
- SparkSession sparkSession = SparkSession.builder().config(sc.getConf()).enableHiveSupport().getOrCreate();
- final Dataset intermediateTable = sparkSession.table(hiveTable);
- recordRDD = intermediateTable.javaRDD().map(new Function<Row, String[]>() {
- @Override
- public String[] call(Row row) throws Exception {
- String[] result = new String[row.size()];
- for (int i = 0; i < row.size(); i++) {
- final Object o = row.get(i);
- if (o != null) {
- result[i] = o.toString();
- } else {
- result[i] = null;
- }
- }
- return result;
- }
- });
+ recordRDD = getOtherFormatHiveInput(sc, hiveTable);
}
return recordRDD;
}
+ private static JavaRDD<String[]> getSequenceFormatHiveInput(JavaSparkContext sc, String inputPath) {
+ return sc.sequenceFile(inputPath, BytesWritable.class, Text.class).values()
+ .map(new Function<Text, String[]>() {
+ @Override
+ public String[] call(Text text) throws Exception {
+ String s = Bytes.toString(text.getBytes(), 0, text.getLength());
+ return s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
+ }
+ });
+ }
+
+ private static JavaRDD<String[]> getOtherFormatHiveInput(JavaSparkContext sc, String hiveTable) {
+ SparkSession sparkSession = SparkSession.builder().config(sc.getConf()).enableHiveSupport().getOrCreate();
+ final Dataset intermediateTable = sparkSession.table(hiveTable);
+ return intermediateTable.javaRDD().map(new Function<Row, String[]>() {
+ @Override
+ public String[] call(Row row) throws Exception {
+ String[] result = new String[row.size()];
+ for (int i = 0; i < row.size(); i++) {
+ final Object o = row.get(i);
+ if (o != null) {
+ result[i] = o.toString();
+ } else {
+ result[i] = null;
+ }
+ }
+ return result;
+ }
+ });
+ }
+
}