You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/10/19 02:20:31 UTC

[kylin] branch master updated: KYLIN-3619 Some job won't clean up temp directory after finished

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 30c484c  KYLIN-3619 Some job won't clean up temp directory after finished
30c484c is described below

commit 30c484c33c2ddc728e4bc522dc00c97f7999c4b8
Author: Enwei Jiao <en...@kyligence.io>
AuthorDate: Fri Oct 19 09:35:03 2018 +0800

    KYLIN-3619 Some job won't clean up temp directory after finished
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  4 +
 .../common/persistence/AutoDeleteDirectory.java    | 58 +++++++++++++
 .../persistence/AutoDeleteDirectoryTest.java       | 39 +++++++++
 .../kylin/engine/mr/common/AbstractHadoopJob.java  | 35 ++++----
 .../kylin/engine/mr/common/JobRelatedMetaUtil.java | 38 +++++----
 .../kylin/rest/controller/DiagnosisController.java | 24 +++---
 .../kylin/rest/service/DiagnosisService.java       | 11 +--
 .../hive/cardinality/HiveColumnCardinalityJob.java | 97 +++++++++++-----------
 8 files changed, 205 insertions(+), 101 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index e54d722..5577307 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -103,6 +103,10 @@ abstract public class KylinConfigBase implements Serializable {
         return getKylinHome() + File.separator + "spark";
     }
 
+    public static String getTempDir() {
+        return System.getProperty("java.io.tmpdir");
+    }
+
     // backward compatibility check happens when properties is loaded or updated
     static BackwardCompatibilityConfig BCC = new BackwardCompatibilityConfig();
 
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/AutoDeleteDirectory.java b/core-common/src/main/java/org/apache/kylin/common/persistence/AutoDeleteDirectory.java
new file mode 100644
index 0000000..a496ba8
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/AutoDeleteDirectory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+
+public class AutoDeleteDirectory implements Closeable {
+
+    private final File tempFile;
+
+    public AutoDeleteDirectory(File file) {
+        tempFile = file;
+    }
+    public AutoDeleteDirectory(String prefix, String suffix) {
+        try {
+            tempFile = File.createTempFile(prefix, suffix);
+            org.apache.commons.io.FileUtils.forceDelete(tempFile); // we need a directory, so delete the file first
+            tempFile.mkdirs();
+        } catch (IOException e) {
+            throw new RuntimeException("create temp file " + prefix + "****" + suffix + " failed", e);
+        }
+    }
+
+    public String getAbsolutePath() {
+        return tempFile.getAbsolutePath();
+    }
+
+    public AutoDeleteDirectory child(String child) {
+        return new AutoDeleteDirectory(new File(tempFile, child));
+    }
+
+    public File getFile() {
+        return tempFile;
+    }
+
+    @Override
+    public void close() throws IOException {
+        org.apache.commons.io.FileUtils.forceDelete(tempFile);
+    }
+}
diff --git a/core-common/src/test/java/org/apache/kylin/common/persistence/AutoDeleteDirectoryTest.java b/core-common/src/test/java/org/apache/kylin/common/persistence/AutoDeleteDirectoryTest.java
new file mode 100644
index 0000000..2bf717c
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/persistence/AutoDeleteDirectoryTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AutoDeleteDirectoryTest {
+
+    @Test
+    public void testBasic() throws IOException {
+        File tempFile = null;
+        try (AutoDeleteDirectory autoTempFile = new AutoDeleteDirectory("test", "")) {
+            Assert.assertTrue(autoTempFile.getFile().isDirectory());
+            Assert.assertEquals(0, autoTempFile.getFile().listFiles().length);
+            tempFile = autoTempFile.getFile();
+        }
+        Assert.assertTrue(!tempFile.exists());
+    }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 8873f30..6a9158d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -622,24 +622,29 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
     }
 
     protected void cleanupTempConfFile(Configuration conf) {
-        String tempMetaFileString = conf.get("tmpfiles");
-        logger.trace("tempMetaFileString is : " + tempMetaFileString);
-        if (tempMetaFileString != null) {
-            if (tempMetaFileString.startsWith("file://")) {
-                tempMetaFileString = tempMetaFileString.substring("file://".length());
-                File tempMetaFile = new File(tempMetaFileString);
-                if (tempMetaFile.exists()) {
-                    try {
-                        FileUtils.forceDelete(tempMetaFile.getParentFile());
-
-                    } catch (IOException e) {
-                        logger.warn("error when deleting " + tempMetaFile, e);
+        String[] tempfiles = StringUtils.split(conf.get("tmpfiles"), ",");
+        if (tempfiles == null) {
+            return;
+        }
+        for (String tempMetaFileString : tempfiles) {
+            logger.trace("tempMetaFileString is : " + tempMetaFileString);
+            if (tempMetaFileString != null) {
+                if (tempMetaFileString.startsWith("file://")) {
+                    tempMetaFileString = tempMetaFileString.substring("file://".length());
+                    File tempMetaFile = new File(tempMetaFileString);
+                    if (tempMetaFile.exists()) {
+                        try {
+                            FileUtils.forceDelete(tempMetaFile.getParentFile());
+
+                        } catch (IOException e) {
+                            logger.warn("error when deleting " + tempMetaFile, e);
+                        }
+                    } else {
+                        logger.info("" + tempMetaFileString + " does not exist");
                     }
                 } else {
-                    logger.info("" + tempMetaFileString + " does not exist");
+                    logger.info("tempMetaFileString is not starting with file:// :" + tempMetaFileString);
                 }
-            } else {
-                logger.info("tempMetaFileString is not starting with file:// :" + tempMetaFileString);
             }
         }
     }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
index 64469a0..d1c88ab 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
@@ -18,9 +18,9 @@
 
 package org.apache.kylin.engine.mr.common;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigExt;
+import org.apache.kylin.common.persistence.AutoDeleteDirectory;
 import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.ResourceTool;
@@ -41,6 +41,9 @@ import java.util.Set;
 public class JobRelatedMetaUtil {
     private static final Logger logger = LoggerFactory.getLogger(JobRelatedMetaUtil.class);
 
+    private JobRelatedMetaUtil() {
+    }
+
     public static Set<String> collectCubeMetadata(CubeInstance cube) {
         // cube, model_desc, cube_desc, table
         Set<String> dumpList = new LinkedHashSet<>();
@@ -77,25 +80,26 @@ public class JobRelatedMetaUtil {
 
     public static void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfigExt kylinConfig, String metadataUrl)
             throws IOException {
-        File tmp = File.createTempFile("kylin_job_meta", "");
-        FileUtils.forceDelete(tmp); // we need a directory, so delete the file first
 
-        File metaDir = new File(tmp, "meta");
-        metaDir.mkdirs();
+        try (AutoDeleteDirectory tmpDir = new AutoDeleteDirectory("kylin_job_meta", "");
+             AutoDeleteDirectory metaDir = tmpDir.child("meta")) {
+            // dump metadata
+            JobRelatedMetaUtil.dumpResources(kylinConfig, metaDir.getFile(), dumpList);
 
-        // dump metadata
-        dumpResources(kylinConfig, metaDir, dumpList);
+            // dump metadata
+            dumpResources(kylinConfig, metaDir.getFile(), dumpList);
 
-        // write kylin.properties
-        Properties props = kylinConfig.exportToProperties();
-        props.setProperty("kylin.metadata.url", metadataUrl);
-        File kylinPropsFile = new File(metaDir, "kylin.properties");
-        try (FileOutputStream os = new FileOutputStream(kylinPropsFile)) {
-            props.store(os, kylinPropsFile.getAbsolutePath());
-        }
+            // write kylin.properties
+            Properties props = kylinConfig.exportToProperties();
+            props.setProperty("kylin.metadata.url", metadataUrl);
+            File kylinPropsFile = new File(metaDir.getFile(), "kylin.properties");
+            try (FileOutputStream os = new FileOutputStream(kylinPropsFile)) {
+                props.store(os, kylinPropsFile.getAbsolutePath());
+            }
 
-        KylinConfig dstConfig = KylinConfig.createKylinConfig(props);
-        //upload metadata
-        ResourceTool.copy(KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()), dstConfig);
+            KylinConfig dstConfig = KylinConfig.createKylinConfig(props);
+            //upload metadata
+            ResourceTool.copy(KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()), dstConfig);
+        }
     }
 }
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/DiagnosisController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/DiagnosisController.java
index 108ec5a..57af711 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/DiagnosisController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/DiagnosisController.java
@@ -25,6 +25,7 @@ import java.util.List;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.kylin.common.persistence.AutoDeleteDirectory;
 import org.apache.kylin.metadata.badquery.BadQueryEntry;
 import org.apache.kylin.metadata.badquery.BadQueryHistory;
 import org.apache.kylin.rest.exception.InternalErrorException;
@@ -72,17 +73,18 @@ public class DiagnosisController extends BasicController {
     /**
      * Get diagnosis information for project
      */
-    @RequestMapping(value = "/project/{project}/download", method = { RequestMethod.GET }, produces = { "application/json" })
+    @RequestMapping(value = "/project/{project}/download", method = { RequestMethod.GET }, produces = {
+            "application/json" })
     @ResponseBody
-    public void dumpProjectDiagnosisInfo(@PathVariable String project, final HttpServletRequest request, final HttpServletResponse response) {
-        String filePath;
-        try {
-            filePath = dgService.dumpProjectDiagnosisInfo(project);
+    public void dumpProjectDiagnosisInfo(@PathVariable String project, final HttpServletRequest request,
+            final HttpServletResponse response) {
+        try (AutoDeleteDirectory diagDir = new AutoDeleteDirectory("diag_project", "'")) {
+            String filePath = dgService.dumpProjectDiagnosisInfo(project, diagDir.getFile());
+            setDownloadResponse(filePath, response);
         } catch (IOException e) {
             throw new InternalErrorException("Failed to dump project diagnosis info. " + e.getMessage(), e);
         }
 
-        setDownloadResponse(filePath, response);
     }
 
     /**
@@ -90,15 +92,15 @@ public class DiagnosisController extends BasicController {
      */
     @RequestMapping(value = "/job/{jobId}/download", method = { RequestMethod.GET }, produces = { "application/json" })
     @ResponseBody
-    public void dumpJobDiagnosisInfo(@PathVariable String jobId, final HttpServletRequest request, final HttpServletResponse response) {
-        String filePath;
-        try {
-            filePath = dgService.dumpJobDiagnosisInfo(jobId);
+    public void dumpJobDiagnosisInfo(@PathVariable String jobId, final HttpServletRequest request,
+            final HttpServletResponse response) {
+        try (AutoDeleteDirectory diagDir = new AutoDeleteDirectory("diag_job", "'")) {
+            String filePath = dgService.dumpJobDiagnosisInfo(jobId, diagDir.getFile());
+            setDownloadResponse(filePath, response);
         } catch (IOException e) {
             throw new InternalErrorException("Failed to dump job diagnosis info. " + e.getMessage(), e);
         }
 
-        setDownloadResponse(filePath, response);
     }
 
 }
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/DiagnosisService.java b/server-base/src/main/java/org/apache/kylin/rest/service/DiagnosisService.java
index 57900eb..528858b 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/DiagnosisService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/DiagnosisService.java
@@ -42,17 +42,12 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import com.google.common.collect.Lists;
-import com.google.common.io.Files;
 
 @Component("diagnosisService")
 public class DiagnosisService extends BasicService {
 
     private static final Logger logger = LoggerFactory.getLogger(DiagnosisService.class);
 
-    protected File getDumpDir() {
-        return Files.createTempDir();
-    }
-
     @Autowired
     private AclEvaluate aclEvaluate;
 
@@ -85,17 +80,15 @@ public class DiagnosisService extends BasicService {
         return getBadQueryHistoryManager().getBadQueriesForProject(project);
     }
 
-    public String dumpProjectDiagnosisInfo(String project) throws IOException {
+    public String dumpProjectDiagnosisInfo(String project, File exportPath) throws IOException {
         aclEvaluate.checkProjectOperationPermission(project);
-        File exportPath = getDumpDir();
         String[] args = { project, exportPath.getAbsolutePath() };
         runDiagnosisCLI(args);
         return getDiagnosisPackageName(exportPath);
     }
 
-    public String dumpJobDiagnosisInfo(String jobId) throws IOException {
+    public String dumpJobDiagnosisInfo(String jobId, File exportPath) throws IOException {
         aclEvaluate.checkProjectOperationPermission(jobService.getJobInstance(jobId));
-        File exportPath = getDumpDir();
         String[] args = { jobId, exportPath.getAbsolutePath() };
         runDiagnosisCLI(args);
         return getDiagnosisPackageName(exportPath);
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
index f51fce0..89764c9 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
@@ -6,15 +6,15 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.source.hive.cardinality;
 
@@ -53,68 +53,67 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob {
     protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true)
             .withDescription("The hive table name").create("table");
 
-    public HiveColumnCardinalityJob() {
-    }
-
     @Override
     public int run(String[] args) throws Exception {
+        try {
+            Options options = new Options();
 
-        Options options = new Options();
-
-        options.addOption(OPTION_PROJECT);
-        options.addOption(OPTION_TABLE);
-        options.addOption(OPTION_OUTPUT_PATH);
-
-        parseOptions(options, args);
+            options.addOption(OPTION_PROJECT);
+            options.addOption(OPTION_TABLE);
+            options.addOption(OPTION_OUTPUT_PATH);
 
-        // start job
-        String jobName = JOB_TITLE + getOptionsAsString();
-        logger.info("Starting: " + jobName);
-        Configuration conf = getConf();
+            parseOptions(options, args);
 
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig);
-        conf.addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null)));
+            // start job
+            String jobName = JOB_TITLE + getOptionsAsString();
+            logger.info("Starting: {}", jobName);
+            Configuration conf = getConf();
 
-        job = Job.getInstance(conf, jobName);
+            KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+            JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig);
+            conf.addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null)));
 
-        setJobClasspath(job, kylinConfig);
+            job = Job.getInstance(conf, jobName);
 
-        String project = getOptionValue(OPTION_PROJECT);
-        String table = getOptionValue(OPTION_TABLE);
-        job.getConfiguration().set(BatchConstants.CFG_PROJECT_NAME, project);
-        job.getConfiguration().set(BatchConstants.CFG_TABLE_NAME, table);
+            setJobClasspath(job, kylinConfig);
 
-        Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-        FileOutputFormat.setOutputPath(job, output);
-        job.getConfiguration().set("dfs.blocksize", "67108864");
-        job.getConfiguration().set("mapreduce.output.fileoutputformat.compress", "false");
+            String project = getOptionValue(OPTION_PROJECT);
+            String table = getOptionValue(OPTION_TABLE);
+            job.getConfiguration().set(BatchConstants.CFG_PROJECT_NAME, project);
+            job.getConfiguration().set(BatchConstants.CFG_TABLE_NAME, table);
 
-        // Mapper
-        IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(table, project,
-                getOptionValue(OPTION_CUBING_JOB_ID));
-        tableInputFormat.configureJob(job);
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+            FileOutputFormat.setOutputPath(job, output);
+            job.getConfiguration().set("dfs.blocksize", "67108864");
+            job.getConfiguration().set("mapreduce.output.fileoutputformat.compress", "false");
 
-        job.setMapperClass(ColumnCardinalityMapper.class);
-        job.setMapOutputKeyClass(IntWritable.class);
-        job.setMapOutputValueClass(BytesWritable.class);
+            // Mapper
+            IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(table, project,
+                    getOptionValue(OPTION_CUBING_JOB_ID));
+            tableInputFormat.configureJob(job);
 
-        // Reducer - only one
-        job.setReducerClass(ColumnCardinalityReducer.class);
-        job.setOutputFormatClass(TextOutputFormat.class);
-        job.setOutputKeyClass(IntWritable.class);
-        job.setOutputValueClass(LongWritable.class);
-        job.setNumReduceTasks(1);
+            job.setMapperClass(ColumnCardinalityMapper.class);
+            job.setMapOutputKeyClass(IntWritable.class);
+            job.setMapOutputValueClass(BytesWritable.class);
 
-        this.deletePath(job.getConfiguration(), output);
+            // Reducer - only one
+            job.setReducerClass(ColumnCardinalityReducer.class);
+            job.setOutputFormatClass(TextOutputFormat.class);
+            job.setOutputKeyClass(IntWritable.class);
+            job.setOutputValueClass(LongWritable.class);
+            job.setNumReduceTasks(1);
 
-        logger.info("Going to submit HiveColumnCardinalityJob for table '" + table + "'");
+            this.deletePath(job.getConfiguration(), output);
 
-        TableDesc tableDesc = TableMetadataManager.getInstance(kylinConfig).getTableDesc(table, project);
-        attachTableMetadata(tableDesc, job.getConfiguration());
-        int result = waitForCompletion(job);
+            logger.info("Going to submit HiveColumnCardinalityJob for table '{}'", table);
 
-        return result;
+            TableDesc tableDesc = TableMetadataManager.getInstance(kylinConfig).getTableDesc(table, project);
+            attachTableMetadata(tableDesc, job.getConfiguration());
+            return waitForCompletion(job);
+        } finally {
+            if (job != null)
+                cleanupTempConfFile(job.getConfiguration());
+        }
     }
 
 }