You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/10 09:31:18 UTC

[kylin] branch master updated: KYLIN-3427 Bug fix for covnerting to HFile in Spark

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new a565fda  KYLIN-3427 Bug fix for covnerting to HFile in Spark
a565fda is described below

commit a565fdae47a818ff0062de4ac6cbdde733cd6bf5
Author: shaofengshi <sh...@apache.org>
AuthorDate: Tue Jul 10 17:29:20 2018 +0800

    KYLIN-3427 Bug fix for covnerting to HFile in Spark
---
 .../src/main/java/org/apache/kylin/cube/CubeManager.java    |  2 +-
 .../java/org/apache/kylin/job/common/PatternedLogger.java   |  7 +++----
 .../java/org/apache/kylin/engine/spark/SparkExecutable.java | 13 +++++++++----
 .../apache/kylin/storage/hbase/steps/SparkCubeHFile.java    |  4 +++-
 4 files changed, 16 insertions(+), 10 deletions(-)

diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 3f4c576..3ff0160 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -801,7 +801,7 @@ public class CubeManager implements IRealizationProvider {
             if (force == false) {
                 List<String> emptySegment = Lists.newArrayList();
                 for (CubeSegment seg : mergingSegments) {
-                    if (seg.getSizeKB() == 0) {
+                    if (seg.getSizeKB() == 0 && seg.getInputRecords() == 0) {
                         emptySegment.add(seg.getName());
                     }
                 }
diff --git a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
index 99a1aa9..73e7c56 100644
--- a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
+++ b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
@@ -34,18 +34,18 @@ import com.google.common.collect.Maps;
  */
 public class PatternedLogger extends BufferedLogger {
     private final Map<String, String> info = Maps.newHashMap();
-    ILogListener listener = null;
+    private ILogListener listener = null;
 
     private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager");
     private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)");
     private static final Pattern PATTERN_JOB_ID = Pattern.compile("Running job: (.*)");
     private static final Pattern PATTERN_HDFS_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS: Number of bytes written=(\\d+)");
     private static final Pattern PATTERN_SOURCE_RECORDS_COUNT = Pattern.compile("Map input records=(\\d+)");
-    private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write");
+    private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) (?:HD|MAPR)FS Write");
 
     // hive
     private static final Pattern PATTERN_HIVE_APP_ID_URL = Pattern.compile("Starting Job = (.*?), Tracking URL = (.*)");
-    private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write: (\\d+) SUCCESS");
+    private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) (?:HD|MAPR)FS Write: (\\d+) SUCCESS");
 
     private static final Pattern PATTERN_HIVE_APP_ID_URL_2 = Pattern.compile("Executing on YARN cluster with App id  (.*?)");
 
@@ -53,7 +53,6 @@ public class PatternedLogger extends BufferedLogger {
     private static final Pattern PATTERN_SPARK_APP_ID = Pattern.compile("Submitted application (.*?)");
     private static final Pattern PATTERN_SPARK_APP_URL = Pattern.compile("tracking URL: (.*)");
 
-
     private static Map<Pattern, Pair<String, Integer>> patternMap = Maps.newHashMap(); // key is pattern, value is a pair, the first is property key, second is pattern index.
 
     static {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 1c64119..90442a4 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -55,7 +55,7 @@ public class SparkExecutable extends AbstractExecutable {
     private static final String CLASS_NAME = "className";
     private static final String JARS = "jars";
     private static final String JOB_ID = "jobId";
-    private String counter_save_as;
+    private static final String COUNTER_SAVE_AS = "CounterSaveAs";
 
     public void setClassName(String className) {
         this.setParam(CLASS_NAME, className);
@@ -70,7 +70,11 @@ public class SparkExecutable extends AbstractExecutable {
     }
 
     public void setCounterSaveAs(String value) {
-        counter_save_as = value;
+        this.setParam(COUNTER_SAVE_AS, value);
+    }
+
+    public String getCounterSaveAs() {
+        return getParam(COUNTER_SAVE_AS);
     }
 
     private String formatArgs() {
@@ -80,7 +84,7 @@ public class SparkExecutable extends AbstractExecutable {
             tmp.append("-").append(entry.getKey()).append(" ").append(entry.getValue()).append(" ");
             if (entry.getKey().equals(CLASS_NAME)) {
                 stringBuilder.insert(0, tmp);
-            } else if (entry.getKey().equals(JARS) || entry.getKey().equals(JOB_ID)) {
+            } else if (entry.getKey().equals(JARS) || entry.getKey().equals(JOB_ID) || entry.getKey().equals(COUNTER_SAVE_AS)) {
                 // JARS is for spark-submit, not for app
                 continue;
             } else {
@@ -160,7 +164,7 @@ public class SparkExecutable extends AbstractExecutable {
                 public void onLogEvent(String infoKey, Map<String, String> info) {
                     // only care two properties here
                     if (ExecutableConstants.YARN_APP_ID.equals(infoKey)
-                            || ExecutableConstants.YARN_APP_ID.equals(infoKey)) {
+                            || ExecutableConstants.YARN_APP_URL.equals(infoKey)) {
                         getManager().addJobInfo(getId(), info);
                     }
                 }
@@ -219,6 +223,7 @@ public class SparkExecutable extends AbstractExecutable {
     }
 
     private void readCounters(final Map<String, String> info) {
+        String counter_save_as = getCounterSaveAs();
         if (counter_save_as != null) {
             String[] saveAsNames = counter_save_as.split(",");
             saveCounterAs(info.get(ExecutableConstants.SOURCE_RECORDS_COUNT), saveAsNames, 0, info);
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index b2571ae..a23156c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -239,8 +239,10 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
         hfilerdd2.saveAsNewAPIHadoopFile(outputPath, ImmutableBytesWritable.class, KeyValue.class,
                 HFileOutputFormat2.class, job.getConfiguration());
 
+        // output the data size to console, job engine will parse and save the metric
+        // please note: this mechanism won't work when spark.submit.deployMode=cluster
         System.out.println("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
-        // deleteHDFSMeta(metaUrl);
+        deleteHDFSMeta(metaUrl);
     }
 
     private List<JavaPairRDD> parseInputPath(String inputPath, FileSystem fs, JavaSparkContext sc) throws IOException {