You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/10/19 02:20:31 UTC
[kylin] branch master updated: KYLIN-3619 Some job won't clean up
temp directory after finished
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 30c484c KYLIN-3619 Some job won't clean up temp directory after finished
30c484c is described below
commit 30c484c33c2ddc728e4bc522dc00c97f7999c4b8
Author: Enwei Jiao <en...@kyligence.io>
AuthorDate: Fri Oct 19 09:35:03 2018 +0800
KYLIN-3619 Some job won't clean up temp directory after finished
---
.../org/apache/kylin/common/KylinConfigBase.java | 4 +
.../common/persistence/AutoDeleteDirectory.java | 58 +++++++++++++
.../persistence/AutoDeleteDirectoryTest.java | 39 +++++++++
.../kylin/engine/mr/common/AbstractHadoopJob.java | 35 ++++----
.../kylin/engine/mr/common/JobRelatedMetaUtil.java | 38 +++++----
.../kylin/rest/controller/DiagnosisController.java | 24 +++---
.../kylin/rest/service/DiagnosisService.java | 11 +--
.../hive/cardinality/HiveColumnCardinalityJob.java | 97 +++++++++++-----------
8 files changed, 205 insertions(+), 101 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index e54d722..5577307 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -103,6 +103,10 @@ abstract public class KylinConfigBase implements Serializable {
return getKylinHome() + File.separator + "spark";
}
+ public static String getTempDir() {
+ return System.getProperty("java.io.tmpdir");
+ }
+
// backward compatibility check happens when properties is loaded or updated
static BackwardCompatibilityConfig BCC = new BackwardCompatibilityConfig();
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/AutoDeleteDirectory.java b/core-common/src/main/java/org/apache/kylin/common/persistence/AutoDeleteDirectory.java
new file mode 100644
index 0000000..a496ba8
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/AutoDeleteDirectory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+
+public class AutoDeleteDirectory implements Closeable {
+
+ private final File tempFile;
+
+ public AutoDeleteDirectory(File file) {
+ tempFile = file;
+ }
+ public AutoDeleteDirectory(String prefix, String suffix) {
+ try {
+ tempFile = File.createTempFile(prefix, suffix);
+ org.apache.commons.io.FileUtils.forceDelete(tempFile); // we need a directory, so delete the file first
+ tempFile.mkdirs();
+ } catch (IOException e) {
+ throw new RuntimeException("create temp file " + prefix + "****" + suffix + " failed", e);
+ }
+ }
+
+ public String getAbsolutePath() {
+ return tempFile.getAbsolutePath();
+ }
+
+ public AutoDeleteDirectory child(String child) {
+ return new AutoDeleteDirectory(new File(tempFile, child));
+ }
+
+ public File getFile() {
+ return tempFile;
+ }
+
+ @Override
+ public void close() throws IOException {
+ org.apache.commons.io.FileUtils.forceDelete(tempFile);
+ }
+}
diff --git a/core-common/src/test/java/org/apache/kylin/common/persistence/AutoDeleteDirectoryTest.java b/core-common/src/test/java/org/apache/kylin/common/persistence/AutoDeleteDirectoryTest.java
new file mode 100644
index 0000000..2bf717c
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/persistence/AutoDeleteDirectoryTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.persistence;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AutoDeleteDirectoryTest {
+
+ @Test
+ public void testBasic() throws IOException {
+ File tempFile = null;
+ try (AutoDeleteDirectory autoTempFile = new AutoDeleteDirectory("test", "")) {
+ Assert.assertTrue(autoTempFile.getFile().isDirectory());
+ Assert.assertEquals(0, autoTempFile.getFile().listFiles().length);
+ tempFile = autoTempFile.getFile();
+ }
+ Assert.assertTrue(!tempFile.exists());
+ }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 8873f30..6a9158d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -622,24 +622,29 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
}
protected void cleanupTempConfFile(Configuration conf) {
- String tempMetaFileString = conf.get("tmpfiles");
- logger.trace("tempMetaFileString is : " + tempMetaFileString);
- if (tempMetaFileString != null) {
- if (tempMetaFileString.startsWith("file://")) {
- tempMetaFileString = tempMetaFileString.substring("file://".length());
- File tempMetaFile = new File(tempMetaFileString);
- if (tempMetaFile.exists()) {
- try {
- FileUtils.forceDelete(tempMetaFile.getParentFile());
-
- } catch (IOException e) {
- logger.warn("error when deleting " + tempMetaFile, e);
+ String[] tempfiles = StringUtils.split(conf.get("tmpfiles"), ",");
+ if (tempfiles == null) {
+ return;
+ }
+ for (String tempMetaFileString : tempfiles) {
+ logger.trace("tempMetaFileString is : " + tempMetaFileString);
+ if (tempMetaFileString != null) {
+ if (tempMetaFileString.startsWith("file://")) {
+ tempMetaFileString = tempMetaFileString.substring("file://".length());
+ File tempMetaFile = new File(tempMetaFileString);
+ if (tempMetaFile.exists()) {
+ try {
+ FileUtils.forceDelete(tempMetaFile.getParentFile());
+
+ } catch (IOException e) {
+ logger.warn("error when deleting " + tempMetaFile, e);
+ }
+ } else {
+ logger.info("" + tempMetaFileString + " does not exist");
}
} else {
- logger.info("" + tempMetaFileString + " does not exist");
+ logger.info("tempMetaFileString is not starting with file:// :" + tempMetaFileString);
}
- } else {
- logger.info("tempMetaFileString is not starting with file:// :" + tempMetaFileString);
}
}
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
index 64469a0..d1c88ab 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
@@ -18,9 +18,9 @@
package org.apache.kylin.engine.mr.common;
-import org.apache.commons.io.FileUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigExt;
+import org.apache.kylin.common.persistence.AutoDeleteDirectory;
import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.ResourceTool;
@@ -41,6 +41,9 @@ import java.util.Set;
public class JobRelatedMetaUtil {
private static final Logger logger = LoggerFactory.getLogger(JobRelatedMetaUtil.class);
+ private JobRelatedMetaUtil() {
+ }
+
public static Set<String> collectCubeMetadata(CubeInstance cube) {
// cube, model_desc, cube_desc, table
Set<String> dumpList = new LinkedHashSet<>();
@@ -77,25 +80,26 @@ public class JobRelatedMetaUtil {
public static void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfigExt kylinConfig, String metadataUrl)
throws IOException {
- File tmp = File.createTempFile("kylin_job_meta", "");
- FileUtils.forceDelete(tmp); // we need a directory, so delete the file first
- File metaDir = new File(tmp, "meta");
- metaDir.mkdirs();
+ try (AutoDeleteDirectory tmpDir = new AutoDeleteDirectory("kylin_job_meta", "");
+ AutoDeleteDirectory metaDir = tmpDir.child("meta")) {
+ // dump metadata
+ JobRelatedMetaUtil.dumpResources(kylinConfig, metaDir.getFile(), dumpList);
- // dump metadata
- dumpResources(kylinConfig, metaDir, dumpList);
+ // dump metadata
+ dumpResources(kylinConfig, metaDir.getFile(), dumpList);
- // write kylin.properties
- Properties props = kylinConfig.exportToProperties();
- props.setProperty("kylin.metadata.url", metadataUrl);
- File kylinPropsFile = new File(metaDir, "kylin.properties");
- try (FileOutputStream os = new FileOutputStream(kylinPropsFile)) {
- props.store(os, kylinPropsFile.getAbsolutePath());
- }
+ // write kylin.properties
+ Properties props = kylinConfig.exportToProperties();
+ props.setProperty("kylin.metadata.url", metadataUrl);
+ File kylinPropsFile = new File(metaDir.getFile(), "kylin.properties");
+ try (FileOutputStream os = new FileOutputStream(kylinPropsFile)) {
+ props.store(os, kylinPropsFile.getAbsolutePath());
+ }
- KylinConfig dstConfig = KylinConfig.createKylinConfig(props);
- //upload metadata
- ResourceTool.copy(KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()), dstConfig);
+ KylinConfig dstConfig = KylinConfig.createKylinConfig(props);
+ //upload metadata
+ ResourceTool.copy(KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()), dstConfig);
+ }
}
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/DiagnosisController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/DiagnosisController.java
index 108ec5a..57af711 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/DiagnosisController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/DiagnosisController.java
@@ -25,6 +25,7 @@ import java.util.List;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.kylin.common.persistence.AutoDeleteDirectory;
import org.apache.kylin.metadata.badquery.BadQueryEntry;
import org.apache.kylin.metadata.badquery.BadQueryHistory;
import org.apache.kylin.rest.exception.InternalErrorException;
@@ -72,17 +73,18 @@ public class DiagnosisController extends BasicController {
/**
* Get diagnosis information for project
*/
- @RequestMapping(value = "/project/{project}/download", method = { RequestMethod.GET }, produces = { "application/json" })
+ @RequestMapping(value = "/project/{project}/download", method = { RequestMethod.GET }, produces = {
+ "application/json" })
@ResponseBody
- public void dumpProjectDiagnosisInfo(@PathVariable String project, final HttpServletRequest request, final HttpServletResponse response) {
- String filePath;
- try {
- filePath = dgService.dumpProjectDiagnosisInfo(project);
+ public void dumpProjectDiagnosisInfo(@PathVariable String project, final HttpServletRequest request,
+ final HttpServletResponse response) {
+ try (AutoDeleteDirectory diagDir = new AutoDeleteDirectory("diag_project", "'")) {
+ String filePath = dgService.dumpProjectDiagnosisInfo(project, diagDir.getFile());
+ setDownloadResponse(filePath, response);
} catch (IOException e) {
throw new InternalErrorException("Failed to dump project diagnosis info. " + e.getMessage(), e);
}
- setDownloadResponse(filePath, response);
}
/**
@@ -90,15 +92,15 @@ public class DiagnosisController extends BasicController {
*/
@RequestMapping(value = "/job/{jobId}/download", method = { RequestMethod.GET }, produces = { "application/json" })
@ResponseBody
- public void dumpJobDiagnosisInfo(@PathVariable String jobId, final HttpServletRequest request, final HttpServletResponse response) {
- String filePath;
- try {
- filePath = dgService.dumpJobDiagnosisInfo(jobId);
+ public void dumpJobDiagnosisInfo(@PathVariable String jobId, final HttpServletRequest request,
+ final HttpServletResponse response) {
+ try (AutoDeleteDirectory diagDir = new AutoDeleteDirectory("diag_job", "'")) {
+ String filePath = dgService.dumpJobDiagnosisInfo(jobId, diagDir.getFile());
+ setDownloadResponse(filePath, response);
} catch (IOException e) {
throw new InternalErrorException("Failed to dump job diagnosis info. " + e.getMessage(), e);
}
- setDownloadResponse(filePath, response);
}
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/DiagnosisService.java b/server-base/src/main/java/org/apache/kylin/rest/service/DiagnosisService.java
index 57900eb..528858b 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/DiagnosisService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/DiagnosisService.java
@@ -42,17 +42,12 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.collect.Lists;
-import com.google.common.io.Files;
@Component("diagnosisService")
public class DiagnosisService extends BasicService {
private static final Logger logger = LoggerFactory.getLogger(DiagnosisService.class);
- protected File getDumpDir() {
- return Files.createTempDir();
- }
-
@Autowired
private AclEvaluate aclEvaluate;
@@ -85,17 +80,15 @@ public class DiagnosisService extends BasicService {
return getBadQueryHistoryManager().getBadQueriesForProject(project);
}
- public String dumpProjectDiagnosisInfo(String project) throws IOException {
+ public String dumpProjectDiagnosisInfo(String project, File exportPath) throws IOException {
aclEvaluate.checkProjectOperationPermission(project);
- File exportPath = getDumpDir();
String[] args = { project, exportPath.getAbsolutePath() };
runDiagnosisCLI(args);
return getDiagnosisPackageName(exportPath);
}
- public String dumpJobDiagnosisInfo(String jobId) throws IOException {
+ public String dumpJobDiagnosisInfo(String jobId, File exportPath) throws IOException {
aclEvaluate.checkProjectOperationPermission(jobService.getJobInstance(jobId));
- File exportPath = getDumpDir();
String[] args = { jobId, exportPath.getAbsolutePath() };
runDiagnosisCLI(args);
return getDiagnosisPackageName(exportPath);
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
index f51fce0..89764c9 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
@@ -6,15 +6,15 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.source.hive.cardinality;
@@ -53,68 +53,67 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob {
protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true)
.withDescription("The hive table name").create("table");
- public HiveColumnCardinalityJob() {
- }
-
@Override
public int run(String[] args) throws Exception {
+ try {
+ Options options = new Options();
- Options options = new Options();
-
- options.addOption(OPTION_PROJECT);
- options.addOption(OPTION_TABLE);
- options.addOption(OPTION_OUTPUT_PATH);
-
- parseOptions(options, args);
+ options.addOption(OPTION_PROJECT);
+ options.addOption(OPTION_TABLE);
+ options.addOption(OPTION_OUTPUT_PATH);
- // start job
- String jobName = JOB_TITLE + getOptionsAsString();
- logger.info("Starting: " + jobName);
- Configuration conf = getConf();
+ parseOptions(options, args);
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig);
- conf.addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null)));
+ // start job
+ String jobName = JOB_TITLE + getOptionsAsString();
+ logger.info("Starting: {}", jobName);
+ Configuration conf = getConf();
- job = Job.getInstance(conf, jobName);
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig);
+ conf.addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null)));
- setJobClasspath(job, kylinConfig);
+ job = Job.getInstance(conf, jobName);
- String project = getOptionValue(OPTION_PROJECT);
- String table = getOptionValue(OPTION_TABLE);
- job.getConfiguration().set(BatchConstants.CFG_PROJECT_NAME, project);
- job.getConfiguration().set(BatchConstants.CFG_TABLE_NAME, table);
+ setJobClasspath(job, kylinConfig);
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- FileOutputFormat.setOutputPath(job, output);
- job.getConfiguration().set("dfs.blocksize", "67108864");
- job.getConfiguration().set("mapreduce.output.fileoutputformat.compress", "false");
+ String project = getOptionValue(OPTION_PROJECT);
+ String table = getOptionValue(OPTION_TABLE);
+ job.getConfiguration().set(BatchConstants.CFG_PROJECT_NAME, project);
+ job.getConfiguration().set(BatchConstants.CFG_TABLE_NAME, table);
- // Mapper
- IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(table, project,
- getOptionValue(OPTION_CUBING_JOB_ID));
- tableInputFormat.configureJob(job);
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+ FileOutputFormat.setOutputPath(job, output);
+ job.getConfiguration().set("dfs.blocksize", "67108864");
+ job.getConfiguration().set("mapreduce.output.fileoutputformat.compress", "false");
- job.setMapperClass(ColumnCardinalityMapper.class);
- job.setMapOutputKeyClass(IntWritable.class);
- job.setMapOutputValueClass(BytesWritable.class);
+ // Mapper
+ IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(table, project,
+ getOptionValue(OPTION_CUBING_JOB_ID));
+ tableInputFormat.configureJob(job);
- // Reducer - only one
- job.setReducerClass(ColumnCardinalityReducer.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- job.setOutputKeyClass(IntWritable.class);
- job.setOutputValueClass(LongWritable.class);
- job.setNumReduceTasks(1);
+ job.setMapperClass(ColumnCardinalityMapper.class);
+ job.setMapOutputKeyClass(IntWritable.class);
+ job.setMapOutputValueClass(BytesWritable.class);
- this.deletePath(job.getConfiguration(), output);
+ // Reducer - only one
+ job.setReducerClass(ColumnCardinalityReducer.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(LongWritable.class);
+ job.setNumReduceTasks(1);
- logger.info("Going to submit HiveColumnCardinalityJob for table '" + table + "'");
+ this.deletePath(job.getConfiguration(), output);
- TableDesc tableDesc = TableMetadataManager.getInstance(kylinConfig).getTableDesc(table, project);
- attachTableMetadata(tableDesc, job.getConfiguration());
- int result = waitForCompletion(job);
+ logger.info("Going to submit HiveColumnCardinalityJob for table '{}'", table);
- return result;
+ TableDesc tableDesc = TableMetadataManager.getInstance(kylinConfig).getTableDesc(table, project);
+ attachTableMetadata(tableDesc, job.getConfiguration());
+ return waitForCompletion(job);
+ } finally {
+ if (job != null)
+ cleanupTempConfFile(job.getConfiguration());
+ }
}
}