You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kylin.apache.org by GitBox <gi...@apache.org> on 2018/08/28 02:30:29 UTC

[GitHub] shaofengshi closed pull request #210: KYLIN-3477 Save spark job counter to hdfs

shaofengshi closed pull request #210: KYLIN-3477 Save spark job counter to hdfs
URL: https://github.com/apache/kylin/pull/210
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 3cb43fc4c9..3aef34ad1e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -23,6 +23,7 @@
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -30,12 +31,17 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.StorageURL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
+
 public class HadoopUtil {
     @SuppressWarnings("unused")
     private static final Logger logger = LoggerFactory.getLogger(HadoopUtil.class);
@@ -193,4 +199,34 @@ public static void deleteHDFSMeta(String metaUrl) throws IOException {
         HadoopUtil.getFileSystem(realHdfsPath).delete(new Path(realHdfsPath), true);
         logger.info("Delete metadata in HDFS for this job: " + realHdfsPath);
     }
+
+    @SuppressWarnings("deprecation")
+    public static void writeToSequenceFile(Configuration conf, String outputPath, Map<String, String> counterMap) throws IOException {
+        try (SequenceFile.Writer writer = SequenceFile.createWriter(getWorkingFileSystem(conf), conf, new Path(outputPath), Text.class, Text.class)) {
+            for (Map.Entry<String, String> counterEntry : counterMap.entrySet()) {
+                writer.append(new Text(counterEntry.getKey()), new Text(counterEntry.getValue()));
+            }
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    public static Map<String, String> readFromSequenceFile(Configuration conf, String inputPath) throws IOException {
+        try (SequenceFile.Reader reader = new SequenceFile.Reader(getWorkingFileSystem(conf), new Path(inputPath), conf)) {
+            Map<String, String> map = Maps.newHashMap();
+
+            Text key = (Text) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+            Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+            while (reader.next(key, value)) {
+                map.put(key.toString(), value.toString());
+            }
+
+            return map;
+        }
+    }
+
+    public static Map<String, String> readFromSequenceFile(String inputPath) throws IOException {
+        return readFromSequenceFile(getCurrentConfiguration(), inputPath);
+    }
+
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 02e9fe550f..5b1f38c51c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -334,7 +334,11 @@ public String getOptimizationCuboidPath(String jobId) {
     }
 
     public String getHBaseConfFilePath(String jobId) {
-       return getJobWorkingDir(jobId) + "/hbase-conf.xml";
+        return getJobWorkingDir(jobId) + "/hbase-conf.xml";
+    }
+
+    public String getCounterOuputPath(String jobId) {
+        return getRealizationRootPath(jobId) + "/counter";
     }
 
     // ============================================================================
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 8c2ba7f33d..66da1b266a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -106,6 +106,8 @@
     String ARG_META_URL = "metadataUrl";
     String ARG_HBASE_CONF_PATH = "hbaseConfPath";
     String ARG_SHRUNKEN_DICT_PATH = "shrunkenDictPath";
+    String ARG_COUNTER_OUPUT = "counterOutput";
+
     /**
      * logger and counter
      */
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 5fd7213b2b..3f3c14da2a 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -103,7 +103,7 @@ public SparkExecutable createFactDistinctColumnsSparkStep(String jobId) {
 
         sparkExecutable.setJobId(jobId);
         sparkExecutable.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
-        sparkExecutable.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
+        sparkExecutable.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES, getCounterOuputPath(jobId));
 
         StringBuilder jars = new StringBuilder();
 
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 612239741f..d85337e624 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -35,11 +35,13 @@
 import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.JobRelatedMetaUtil;
 import org.apache.kylin.job.common.PatternedLogger;
 import org.apache.kylin.job.constant.ExecutableConstants;
@@ -80,6 +82,11 @@ public void setCounterSaveAs(String value) {
         this.setParam(COUNTER_SAVE_AS, value);
     }
 
+    public void setCounterSaveAs(String value, String counterOutputPath) {
+        this.setParam(COUNTER_SAVE_AS, value);
+        this.setParam(BatchConstants.ARG_COUNTER_OUPUT, counterOutputPath);
+    }
+
     public String getCounterSaveAs() {
         return getParam(COUNTER_SAVE_AS);
     }
@@ -286,6 +293,14 @@ public void onLogEvent(String infoKey, Map<String, String> info) {
                 }
                 // done, update all properties
                 Map<String, String> joblogInfo = patternedLogger.getInfo();
+
+                // read counter from hdfs
+                String counterOutput = getParam(BatchConstants.ARG_COUNTER_OUPUT);
+                if (counterOutput != null){
+                    Map<String, String> counterMap = HadoopUtil.readFromSequenceFile(counterOutput);
+                    joblogInfo.putAll(counterMap);
+                }
+
                 readCounters(joblogInfo);
                 getManager().addJobInfo(getId(), joblogInfo);
 
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
index 61e2e534e8..6188a56e1f 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
@@ -73,6 +73,7 @@
 import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducerMapping;
 import org.apache.kylin.engine.mr.steps.SelfDefineSortableKey;
 import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.hllc.HLLCounter;
 import org.apache.kylin.measure.hllc.RegisterType;
@@ -119,6 +120,8 @@
             .withDescription("Hive Intermediate Table").create("hiveTable");
     public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
             .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
+    public static final Option OPTION_COUNTER_PATH = OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUPUT).hasArg()
+            .isRequired(true).withDescription("counter output path").create(BatchConstants.ARG_COUNTER_OUPUT);
 
     private Options options;
 
@@ -131,6 +134,7 @@ public SparkFactDistinct() {
         options.addOption(OPTION_INPUT_PATH);
         options.addOption(OPTION_SEGMENT_ID);
         options.addOption(OPTION_STATS_SAMPLING_PERCENT);
+        options.addOption(OPTION_COUNTER_PATH);
     }
 
     @Override
@@ -146,6 +150,7 @@ protected void execute(OptionsHelper optionsHelper) throws Exception {
         String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
         String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
         String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+        String counterPath = optionsHelper.getOptionValue(OPTION_COUNTER_PATH);
         int samplingPercent = Integer.parseInt(optionsHelper.getOptionValue(OPTION_STATS_SAMPLING_PERCENT));
 
         Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"), Class.forName("org.apache.kylin.engine.mr.steps.SelfDefineSortableKey") };
@@ -173,6 +178,7 @@ protected void execute(OptionsHelper optionsHelper) throws Exception {
         logger.info("RDD Output path: {}", outputPath);
         logger.info("getTotalReducerNum: {}", reducerMapping.getTotalReducerNum());
         logger.info("getCuboidRowCounterReducerNum: {}", reducerMapping.getCuboidRowCounterReducerNum());
+        logger.info("counter path {}", counterPath);
 
         boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
 
@@ -202,13 +208,20 @@ protected void execute(OptionsHelper optionsHelper) throws Exception {
 
         multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
 
-        // only work for client mode, not work when spark.submit.deployMode=cluster
         logger.info("Map input records={}", recordRDD.count());
         logger.info("HDFS Read: {} HDFS Write", bytesWritten.value());
 
+        Map<String, String> counterMap = Maps.newHashMap();
+        counterMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT, String.valueOf(recordRDD.count()));
+        counterMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE, String.valueOf(bytesWritten.value()));
+
+        // save counter to hdfs
+        HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
+
         HadoopUtil.deleteHDFSMeta(metaUrl);
     }
 
+
     static class FlatOutputFucntion implements PairFlatMapFunction<Iterator<String[]>, SelfDefineSortableKey, Text> {
         private volatile transient boolean initialized = false;
         private String cubeName;
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
index be230f011e..ccab22f878 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
@@ -71,7 +71,7 @@ public AbstractExecutable createConvertCuboidToHfileStep(String jobId) {
         sparkExecutable.setJars(jars.toString());
 
         sparkExecutable.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
-        sparkExecutable.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+        sparkExecutable.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES, getCounterOuputPath(jobId));
 
         return sparkExecutable;
     }
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index c87a739b9a..539f03b23e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -23,8 +23,10 @@
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -55,6 +57,7 @@
 import org.apache.kylin.engine.mr.common.SerializableConfiguration;
 import org.apache.kylin.engine.spark.KylinSparkJobListener;
 import org.apache.kylin.engine.spark.SparkUtil;
+import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.measure.MeasureCodec;
 import org.apache.spark.Partitioner;
 import org.apache.spark.SparkConf;
@@ -88,6 +91,8 @@
             .isRequired(true).withDescription("Cuboid files PATH").create(BatchConstants.ARG_INPUT);
     public static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName(BatchConstants.ARG_PARTITION)
             .hasArg().isRequired(true).withDescription("Partition file path.").create(BatchConstants.ARG_PARTITION);
+    public static final Option OPTION_COUNTER_PATH = OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUPUT).hasArg()
+            .isRequired(true).withDescription("counter output path").create(BatchConstants.ARG_COUNTER_OUPUT);
 
     private Options options;
 
@@ -100,6 +105,7 @@ public SparkCubeHFile() {
         options.addOption(OPTION_OUTPUT_PATH);
         options.addOption(OPTION_PARTITION_FILE_PATH);
         options.addOption(AbstractHadoopJob.OPTION_HBASE_CONF_PATH);
+        options.addOption(OPTION_COUNTER_PATH);
     }
 
     @Override
@@ -116,6 +122,7 @@ protected void execute(OptionsHelper optionsHelper) throws Exception {
         final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
         final Path partitionFilePath = new Path(optionsHelper.getOptionValue(OPTION_PARTITION_FILE_PATH));
         final String hbaseConfFile = optionsHelper.getOptionValue(AbstractHadoopJob.OPTION_HBASE_CONF_PATH);
+        final String counterPath = optionsHelper.getOptionValue(OPTION_COUNTER_PATH);
 
         Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"), KeyValueCreator.class,
                 KeyValue.class, RowKeyWritable.class };
@@ -226,9 +233,14 @@ protected void execute(OptionsHelper optionsHelper) throws Exception {
                     }
                 }).saveAsNewAPIHadoopDataset(job.getConfiguration());
 
-        // output the data size to console, job engine will parse and save the metric
-        // please note: this mechanism won't work when spark.submit.deployMode=cluster
         logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
+
+        Map<String, String> counterMap = Maps.newHashMap();
+        counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN, String.valueOf(jobListener.metrics.getBytesWritten()));
+
+        // save counter to hdfs
+        HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
+
         //HadoopUtil.deleteHDFSMeta(metaUrl);
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services