You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/10 09:31:18 UTC
[kylin] branch master updated: KYLIN-3427 Bug fix for covnerting to
HFile in Spark
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new a565fda KYLIN-3427 Bug fix for covnerting to HFile in Spark
a565fda is described below
commit a565fdae47a818ff0062de4ac6cbdde733cd6bf5
Author: shaofengshi <sh...@apache.org>
AuthorDate: Tue Jul 10 17:29:20 2018 +0800
KYLIN-3427 Bug fix for covnerting to HFile in Spark
---
.../src/main/java/org/apache/kylin/cube/CubeManager.java | 2 +-
.../java/org/apache/kylin/job/common/PatternedLogger.java | 7 +++----
.../java/org/apache/kylin/engine/spark/SparkExecutable.java | 13 +++++++++----
.../apache/kylin/storage/hbase/steps/SparkCubeHFile.java | 4 +++-
4 files changed, 16 insertions(+), 10 deletions(-)
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 3f4c576..3ff0160 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -801,7 +801,7 @@ public class CubeManager implements IRealizationProvider {
if (force == false) {
List<String> emptySegment = Lists.newArrayList();
for (CubeSegment seg : mergingSegments) {
- if (seg.getSizeKB() == 0) {
+ if (seg.getSizeKB() == 0 && seg.getInputRecords() == 0) {
emptySegment.add(seg.getName());
}
}
diff --git a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
index 99a1aa9..73e7c56 100644
--- a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
+++ b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
@@ -34,18 +34,18 @@ import com.google.common.collect.Maps;
*/
public class PatternedLogger extends BufferedLogger {
private final Map<String, String> info = Maps.newHashMap();
- ILogListener listener = null;
+ private ILogListener listener = null;
private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager");
private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)");
private static final Pattern PATTERN_JOB_ID = Pattern.compile("Running job: (.*)");
private static final Pattern PATTERN_HDFS_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS: Number of bytes written=(\\d+)");
private static final Pattern PATTERN_SOURCE_RECORDS_COUNT = Pattern.compile("Map input records=(\\d+)");
- private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write");
+ private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) (?:HD|MAPR)FS Write");
// hive
private static final Pattern PATTERN_HIVE_APP_ID_URL = Pattern.compile("Starting Job = (.*?), Tracking URL = (.*)");
- private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write: (\\d+) SUCCESS");
+ private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) (?:HD|MAPR)FS Write: (\\d+) SUCCESS");
private static final Pattern PATTERN_HIVE_APP_ID_URL_2 = Pattern.compile("Executing on YARN cluster with App id (.*?)");
@@ -53,7 +53,6 @@ public class PatternedLogger extends BufferedLogger {
private static final Pattern PATTERN_SPARK_APP_ID = Pattern.compile("Submitted application (.*?)");
private static final Pattern PATTERN_SPARK_APP_URL = Pattern.compile("tracking URL: (.*)");
-
private static Map<Pattern, Pair<String, Integer>> patternMap = Maps.newHashMap(); // key is pattern, value is a pair, the first is property key, second is pattern index.
static {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 1c64119..90442a4 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -55,7 +55,7 @@ public class SparkExecutable extends AbstractExecutable {
private static final String CLASS_NAME = "className";
private static final String JARS = "jars";
private static final String JOB_ID = "jobId";
- private String counter_save_as;
+ private static final String COUNTER_SAVE_AS = "CounterSaveAs";
public void setClassName(String className) {
this.setParam(CLASS_NAME, className);
@@ -70,7 +70,11 @@ public class SparkExecutable extends AbstractExecutable {
}
public void setCounterSaveAs(String value) {
- counter_save_as = value;
+ this.setParam(COUNTER_SAVE_AS, value);
+ }
+
+ public String getCounterSaveAs() {
+ return getParam(COUNTER_SAVE_AS);
}
private String formatArgs() {
@@ -80,7 +84,7 @@ public class SparkExecutable extends AbstractExecutable {
tmp.append("-").append(entry.getKey()).append(" ").append(entry.getValue()).append(" ");
if (entry.getKey().equals(CLASS_NAME)) {
stringBuilder.insert(0, tmp);
- } else if (entry.getKey().equals(JARS) || entry.getKey().equals(JOB_ID)) {
+ } else if (entry.getKey().equals(JARS) || entry.getKey().equals(JOB_ID) || entry.getKey().equals(COUNTER_SAVE_AS)) {
// JARS is for spark-submit, not for app
continue;
} else {
@@ -160,7 +164,7 @@ public class SparkExecutable extends AbstractExecutable {
public void onLogEvent(String infoKey, Map<String, String> info) {
// only care two properties here
if (ExecutableConstants.YARN_APP_ID.equals(infoKey)
- || ExecutableConstants.YARN_APP_ID.equals(infoKey)) {
+ || ExecutableConstants.YARN_APP_URL.equals(infoKey)) {
getManager().addJobInfo(getId(), info);
}
}
@@ -219,6 +223,7 @@ public class SparkExecutable extends AbstractExecutable {
}
private void readCounters(final Map<String, String> info) {
+ String counter_save_as = getCounterSaveAs();
if (counter_save_as != null) {
String[] saveAsNames = counter_save_as.split(",");
saveCounterAs(info.get(ExecutableConstants.SOURCE_RECORDS_COUNT), saveAsNames, 0, info);
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index b2571ae..a23156c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -239,8 +239,10 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
hfilerdd2.saveAsNewAPIHadoopFile(outputPath, ImmutableBytesWritable.class, KeyValue.class,
HFileOutputFormat2.class, job.getConfiguration());
+ // output the data size to console, job engine will parse and save the metric
+ // please note: this mechanism won't work when spark.submit.deployMode=cluster
System.out.println("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
- // deleteHDFSMeta(metaUrl);
+ deleteHDFSMeta(metaUrl);
}
private List<JavaPairRDD> parseInputPath(String inputPath, FileSystem fs, JavaSparkContext sc) throws IOException {