You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/03/22 15:32:52 UTC
carbondata git commit: [CARBONDATA-2230][BACKPORT-1.3]Add a path into
table path to store lock files and delete useless segment lock files before
loading
Repository: carbondata
Updated Branches:
refs/heads/branch-1.3 0299dd90a -> 6eb647144
[CARBONDATA-2230][BACKPORT-1.3]Add a path into table path to store lock files and delete useless segment lock files before loading
After PR1984[https://github.com/apache/carbondata/pull/1984] merged, it doesn't delete the lock files when unlock, there are many useless lock files in table path, especially segment lock files, they grow after every batch loading.
Solution :
add a child path into table path, called Locks, all lock files will be stored in this path;
Before loading, get all useless segment lock files and delete them, because just segment lock files will grow, other lock files dosen't grow.
This closes #2076
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6eb64714
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6eb64714
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6eb64714
Branch: refs/heads/branch-1.3
Commit: 6eb6471440aefc8d41bf8fde787c3547f7fd6276
Parents: 0299dd9
Author: Zhang Zhichao <44...@qq.com>
Authored: Thu Mar 8 15:18:09 2018 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Mar 22 21:02:27 2018 +0530
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 11 ++++++
.../core/datastore/impl/FileFactory.java | 12 +++++--
.../carbondata/core/locks/CarbonLockUtil.java | 34 +++++++++++++++++++
.../carbondata/core/locks/HdfsFileLock.java | 35 ++++++++++++--------
.../carbondata/core/locks/LocalFileLock.java | 34 ++++++++-----------
.../carbondata/core/locks/ZooKeeperLocking.java | 7 ++--
.../carbondata/core/util/CarbonProperties.java | 20 +++++++++++
.../core/util/path/CarbonTablePath.java | 32 +++++++++++++++++-
.../carbondata/spark/util/DataLoadingUtil.scala | 2 ++
.../org/apache/spark/util/AlterTableUtil.scala | 4 +--
10 files changed, 149 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index aa93db8..f210408 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1570,6 +1570,17 @@ public final class CarbonCommonConstants {
public static final String CARBON_SHOW_DATAMAPS_DEFAULT = "true";
+ /**
+ * Currently the segment lock files are not deleted immediately when unlock,
+ * this value indicates the number of hours the segment lock files will be preserved.
+ */
+ @CarbonProperty
+ public static final String CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS =
+ "carbon.segment.lock.files.preserve.hours";
+
+ // default value is 2 days
+ public static final String CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT = "48";
+
private CarbonCommonConstants() {
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 38ed2b7..cbbf9b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -177,8 +177,7 @@ public final class FileFactory {
}
/**
- * This method checks the given path exists or not and also is it file or
- * not if the performFileCheck is true
+ * This method checks the given path exists or not.
*
* @param filePath - Path
* @param fileType - FileType Local/HDFS
@@ -187,6 +186,15 @@ public final class FileFactory {
return getCarbonFile(filePath).isFileExist(filePath, fileType);
}
+ /**
+ * This method checks the given path exists or not.
+ *
+ * @param filePath - Path
+ */
+ public static boolean isFileExist(String filePath) throws IOException {
+ return isFileExist(filePath, getFileType(filePath));
+ }
+
public static boolean createNewFile(String filePath, FileType fileType) throws IOException {
return createNewFile(filePath, fileType, true, null);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
index c399ef4..5ac2bc9 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
@@ -19,8 +19,13 @@ package org.apache.carbondata.core.locks;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
/**
* This class contains all carbon lock utilities
@@ -107,4 +112,33 @@ public class CarbonLockUtil {
}
}
+ /**
+ * Currently the segment lock files are not deleted immediately when unlock,
+ * so it needs to delete expired lock files before delete loads.
+ */
+ public static void deleteExpiredSegmentLockFiles(CarbonTable carbonTable) {
+ final long currTime = System.currentTimeMillis();
+ final long segmentLockFilesPreservTime =
+ CarbonProperties.getInstance().getSegmentLockFilesPreserveHours();
+ AbsoluteTableIdentifier absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier();
+ String lockFilesDir = CarbonTablePath
+ .getLockFilesDirPath(absoluteTableIdentifier.getTablePath());
+ CarbonFile[] files = FileFactory.getCarbonFile(lockFilesDir)
+ .listFiles(new CarbonFileFilter() {
+
+ @Override public boolean accept(CarbonFile pathName) {
+ if (CarbonTablePath.isSegmentLockFilePath(pathName.getName())) {
+ if ((currTime - pathName.getLastModifiedTime()) > segmentLockFilesPreservTime) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+ );
+
+ for (CarbonFile file : files) {
+ file.delete();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
index be98f7d..3c28f9d 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
@@ -22,35 +22,38 @@ import java.io.IOException;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
/**
* This class is used to handle the HDFS File locking.
- * This is acheived using the concept of acquiring the data out stream using Append option.
+ * This is achieved using the concept of acquiring the data out stream using Append option.
*/
public class HdfsFileLock extends AbstractCarbonLock {
private static final LogService LOGGER =
LogServiceFactory.getLogService(HdfsFileLock.class.getName());
/**
- * location hdfs file location
+ * lockFilePath is the location of the lock file.
*/
- private String location;
+ private String lockFilePath;
- private DataOutputStream dataOutputStream;
+ /**
+ * lockFileDir is the directory of the lock file.
+ */
+ private String lockFileDir;
- private static String tmpPath;
+ private DataOutputStream dataOutputStream;
/**
* @param lockFileLocation
* @param lockFile
*/
public HdfsFileLock(String lockFileLocation, String lockFile) {
- this.location = lockFileLocation
- + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
- LOGGER.info("HDFS lock path:" + this.location);
+ this.lockFileDir = CarbonTablePath.getLockFilesDirPath(lockFileLocation);
+ this.lockFilePath = CarbonTablePath.getLockFilePath(lockFileLocation, lockFile);
+ LOGGER.info("HDFS lock path:" + this.lockFilePath);
initRetry();
}
@@ -58,7 +61,7 @@ public class HdfsFileLock extends AbstractCarbonLock {
* @param lockFilePath
*/
public HdfsFileLock(String lockFilePath) {
- this.location = lockFilePath;
+ this.lockFilePath = lockFilePath;
initRetry();
}
@@ -75,11 +78,15 @@ public class HdfsFileLock extends AbstractCarbonLock {
*/
@Override public boolean lock() {
try {
- if (!FileFactory.isFileExist(location, FileFactory.getFileType(location))) {
- FileFactory.createNewLockFile(location, FileFactory.getFileType(location));
+ if (null != this.lockFileDir &&
+ !FileFactory.isFileExist(lockFileDir)) {
+ FileFactory.mkdirs(lockFileDir, FileFactory.getFileType(lockFileDir));
+ }
+ if (!FileFactory.isFileExist(lockFilePath)) {
+ FileFactory.createNewLockFile(lockFilePath, FileFactory.getFileType(lockFilePath));
}
- dataOutputStream =
- FileFactory.getDataOutputStreamUsingAppend(location, FileFactory.getFileType(location));
+ dataOutputStream = FileFactory.getDataOutputStreamUsingAppend(lockFilePath,
+ FileFactory.getFileType(lockFilePath));
return true;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
index 75ea074..a40f8de 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
@@ -25,9 +25,9 @@ import java.nio.channels.OverlappingFileLockException;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
/**
* This class handles the file locking in the local file system.
@@ -35,9 +35,14 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
*/
public class LocalFileLock extends AbstractCarbonLock {
/**
- * location is the location of the lock file.
+ * lockFilePath is the location of the lock file.
*/
- private String location;
+ private String lockFilePath;
+
+ /**
+ * lockFileDir is the directory of the lock file.
+ */
+ private String lockFileDir;
/**
* fileOutputStream of the local lock file
@@ -55,27 +60,18 @@ public class LocalFileLock extends AbstractCarbonLock {
private FileLock fileLock;
/**
- * lock file
- */
- private String lockFile;
-
- private String lockFilePath;
-
- /**
* LOGGER for logging the messages.
*/
private static final LogService LOGGER =
LogServiceFactory.getLogService(LocalFileLock.class.getName());
-
-
/**
* @param lockFileLocation
* @param lockFile
*/
public LocalFileLock(String lockFileLocation, String lockFile) {
- this.location = lockFileLocation;
- this.lockFile = lockFile;
+ this.lockFileDir = CarbonTablePath.getLockFilesDirPath(lockFileLocation);
+ this.lockFilePath = CarbonTablePath.getLockFilePath(lockFileLocation, lockFile);
initRetry();
}
@@ -95,13 +91,11 @@ public class LocalFileLock extends AbstractCarbonLock {
*/
@Override public boolean lock() {
try {
- if (!FileFactory.isFileExist(location, FileFactory.getFileType(location))) {
- FileFactory.mkdirs(location, FileFactory.getFileType(location));
+ if (!FileFactory.isFileExist(lockFileDir)) {
+ FileFactory.mkdirs(lockFileDir, FileFactory.getFileType(lockFileDir));
}
- lockFilePath = location + CarbonCommonConstants.FILE_SEPARATOR +
- lockFile;
- if (!FileFactory.isFileExist(lockFilePath, FileFactory.getFileType(location))) {
- FileFactory.createNewLockFile(lockFilePath, FileFactory.getFileType(location));
+ if (!FileFactory.isFileExist(lockFilePath)) {
+ FileFactory.createNewLockFile(lockFilePath, FileFactory.getFileType(lockFilePath));
}
fileOutputStream = new FileOutputStream(lockFilePath);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java b/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java
index 1de5004..5a055ab 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -88,12 +89,12 @@ public class ZooKeeperLocking extends AbstractCarbonLock {
*/
public ZooKeeperLocking(String lockLocation, String lockFile) {
this.lockName = lockFile;
- this.tableIdFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + lockLocation;
+ this.tableIdFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR
+ + CarbonTablePath.getLockFilesDirPath(lockLocation);
initialize();
- this.lockTypeFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + lockLocation
- + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
+ this.lockTypeFolder = tableIdFolder + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
try {
createBaseNode();
// if exists returns null then path doesnt exist. so creating.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 667c45c..a2ace69 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1370,4 +1370,24 @@ public final class CarbonProperties {
return thresholdSize;
}
+ /**
+ * Get the number of hours the segment lock files will be preserved.
+ * It will be converted to microseconds to return.
+ */
+ public long getSegmentLockFilesPreserveHours() {
+ long preserveSeconds;
+ try {
+ int preserveHours = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS,
+ CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT));
+ preserveSeconds = preserveHours * 3600 * 1000L;
+ } catch (NumberFormatException exc) {
+ LOGGER.error(
+ "The segment lock files preserv hours is invalid. Using the default value "
+ + CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT);
+ preserveSeconds = Integer.parseInt(
+ CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT) * 3600 * 1000L;
+ }
+ return preserveSeconds;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index cb264c4..5584e45 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.locks.LockUsage;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
@@ -44,6 +45,7 @@ public class CarbonTablePath extends Path {
private static final String PARTITION_PREFIX = "Part";
private static final String DATA_PART_PREFIX = "part-";
private static final String BATCH_PREFIX = "_batchno";
+ private static final String LOCK_DIR = "LockFiles";
public static final String CARBON_DATA_EXT = ".carbondata";
public static final String INDEX_FILE_EXT = ".carbonindex";
@@ -763,4 +765,32 @@ public class CarbonTablePath extends Path {
return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + "segments";
}
-}
\ No newline at end of file
+ /**
+ * Get the lock files directory
+ */
+ public static String getLockFilesDirPath(String tablePath) {
+ return tablePath + CarbonCommonConstants.FILE_SEPARATOR + LOCK_DIR;
+ }
+
+ /**
+ * Get the lock file
+ */
+ public static String getLockFilePath(String tablePath, String lockType) {
+ return getLockFilesDirPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + lockType;
+ }
+
+ /**
+ * Get the segment lock file according to table path and segment load name
+ */
+ public static String getSegmentLockFilePath(String tablePath, String loadName) {
+ return getLockFilesDirPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR +
+ addSegmentPrefix(loadName) + LockUsage.LOCK;
+ }
+
+ /**
+ * return true if this lock file is a segment lock file otherwise false.
+ */
+ public static boolean isSegmentLockFilePath(String lockFileName) {
+ return lockFileName.startsWith(SEGMENT_PREFIX) && lockFileName.endsWith(LockUsage.LOCK);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index 6767ef7..a60e593 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -437,6 +437,8 @@ object DataLoadingUtil {
}
}
}
+ // delete the expired segment lock files
+ CarbonLockUtil.deleteExpiredSegmentLockFiles(carbonTable)
}
private def isUpdationRequired(isForceDeletion: Boolean,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 8ebd5a9..c59b1a3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverte
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -112,8 +113,7 @@ object AlterTableUtil {
tablePath: String): Unit = {
val lockLocation = tablePath
locks.zip(locksAcquired).foreach { case (carbonLock, lockType) =>
- val lockFilePath = lockLocation + CarbonCommonConstants.FILE_SEPARATOR +
- lockType
+ val lockFilePath = CarbonTablePath.getLockFilePath(lockLocation, lockType)
if (carbonLock.releaseLockManually(lockFilePath)) {
LOGGER.info(s"Alter table lock released successfully: ${ lockType }")
} else {