You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/03/22 15:32:52 UTC

carbondata git commit: [CARBONDATA-2230][BACKPORT-1.3]Add a path into table path to store lock files and delete useless segment lock files before loading

Repository: carbondata
Updated Branches:
  refs/heads/branch-1.3 0299dd90a -> 6eb647144


[CARBONDATA-2230][BACKPORT-1.3]Add a path into table path to store lock files and delete useless segment lock files before loading

After PR1984[https://github.com/apache/carbondata/pull/1984] merged, it doesn't delete the lock files when unlock, there are many useless lock files in table path, especially segment lock files, they grow after every batch loading.

Solution :

add a child path into table path, called Locks, all lock files will be stored in this path;
Before loading, get all useless segment lock files and delete them, because just segment lock files will grow, other lock files dosen't grow.

This closes #2076


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6eb64714
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6eb64714
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6eb64714

Branch: refs/heads/branch-1.3
Commit: 6eb6471440aefc8d41bf8fde787c3547f7fd6276
Parents: 0299dd9
Author: Zhang Zhichao <44...@qq.com>
Authored: Thu Mar 8 15:18:09 2018 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Mar 22 21:02:27 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   | 11 ++++++
 .../core/datastore/impl/FileFactory.java        | 12 +++++--
 .../carbondata/core/locks/CarbonLockUtil.java   | 34 +++++++++++++++++++
 .../carbondata/core/locks/HdfsFileLock.java     | 35 ++++++++++++--------
 .../carbondata/core/locks/LocalFileLock.java    | 34 ++++++++-----------
 .../carbondata/core/locks/ZooKeeperLocking.java |  7 ++--
 .../carbondata/core/util/CarbonProperties.java  | 20 +++++++++++
 .../core/util/path/CarbonTablePath.java         | 32 +++++++++++++++++-
 .../carbondata/spark/util/DataLoadingUtil.scala |  2 ++
 .../org/apache/spark/util/AlterTableUtil.scala  |  4 +--
 10 files changed, 149 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index aa93db8..f210408 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1570,6 +1570,17 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_SHOW_DATAMAPS_DEFAULT = "true";
 
+  /**
+   * Currently the segment lock files are not deleted immediately when unlock,
+   * this value indicates the number of hours the segment lock files will be preserved.
+   */
+  @CarbonProperty
+  public static final String CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS =
+      "carbon.segment.lock.files.preserve.hours";
+
+  // default value is 2 days
+  public static final String CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT = "48";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 38ed2b7..cbbf9b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -177,8 +177,7 @@ public final class FileFactory {
   }
 
   /**
-   * This method checks the given path exists or not and also is it file or
-   * not if the performFileCheck is true
+   * This method checks the given path exists or not.
    *
    * @param filePath - Path
    * @param fileType - FileType Local/HDFS
@@ -187,6 +186,15 @@ public final class FileFactory {
     return getCarbonFile(filePath).isFileExist(filePath, fileType);
   }
 
+  /**
+   * This method checks the given path exists or not.
+   *
+   * @param filePath - Path
+   */
+  public static boolean isFileExist(String filePath) throws IOException {
+    return isFileExist(filePath, getFileType(filePath));
+  }
+
   public static boolean createNewFile(String filePath, FileType fileType) throws IOException {
     return createNewFile(filePath, fileType, true, null);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
index c399ef4..5ac2bc9 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
@@ -19,8 +19,13 @@ package org.apache.carbondata.core.locks;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 /**
  * This class contains all carbon lock utilities
@@ -107,4 +112,33 @@ public class CarbonLockUtil {
     }
   }
 
+  /**
+   * Currently the segment lock files are not deleted immediately when unlock,
+   * so it needs to delete expired lock files before delete loads.
+   */
+  public static void deleteExpiredSegmentLockFiles(CarbonTable carbonTable) {
+    final long currTime = System.currentTimeMillis();
+    final long segmentLockFilesPreservTime =
+        CarbonProperties.getInstance().getSegmentLockFilesPreserveHours();
+    AbsoluteTableIdentifier absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier();
+    String lockFilesDir = CarbonTablePath
+        .getLockFilesDirPath(absoluteTableIdentifier.getTablePath());
+    CarbonFile[] files = FileFactory.getCarbonFile(lockFilesDir)
+        .listFiles(new CarbonFileFilter() {
+
+            @Override public boolean accept(CarbonFile pathName) {
+              if (CarbonTablePath.isSegmentLockFilePath(pathName.getName())) {
+                if ((currTime - pathName.getLastModifiedTime()) > segmentLockFilesPreservTime) {
+                  return true;
+                }
+              }
+              return false;
+            }
+        }
+    );
+
+    for (CarbonFile file : files) {
+      file.delete();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
index be98f7d..3c28f9d 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
@@ -22,35 +22,38 @@ import java.io.IOException;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 /**
  * This class is used to handle the HDFS File locking.
- * This is acheived using the concept of acquiring the data out stream using Append option.
+ * This is achieved using the concept of acquiring the data out stream using Append option.
  */
 public class HdfsFileLock extends AbstractCarbonLock {
 
   private static final LogService LOGGER =
              LogServiceFactory.getLogService(HdfsFileLock.class.getName());
   /**
-   * location hdfs file location
+   * lockFilePath is the location of the lock file.
    */
-  private String location;
+  private String lockFilePath;
 
-  private DataOutputStream dataOutputStream;
+  /**
+   * lockFileDir is the directory of the lock file.
+   */
+  private String lockFileDir;
 
-  private static String tmpPath;
+  private DataOutputStream dataOutputStream;
 
   /**
    * @param lockFileLocation
    * @param lockFile
    */
   public HdfsFileLock(String lockFileLocation, String lockFile) {
-    this.location = lockFileLocation
-        + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
-    LOGGER.info("HDFS lock path:" + this.location);
+    this.lockFileDir = CarbonTablePath.getLockFilesDirPath(lockFileLocation);
+    this.lockFilePath = CarbonTablePath.getLockFilePath(lockFileLocation, lockFile);
+    LOGGER.info("HDFS lock path:" + this.lockFilePath);
     initRetry();
   }
 
@@ -58,7 +61,7 @@ public class HdfsFileLock extends AbstractCarbonLock {
    * @param lockFilePath
    */
   public HdfsFileLock(String lockFilePath) {
-    this.location = lockFilePath;
+    this.lockFilePath = lockFilePath;
     initRetry();
   }
 
@@ -75,11 +78,15 @@ public class HdfsFileLock extends AbstractCarbonLock {
    */
   @Override public boolean lock() {
     try {
-      if (!FileFactory.isFileExist(location, FileFactory.getFileType(location))) {
-        FileFactory.createNewLockFile(location, FileFactory.getFileType(location));
+      if (null != this.lockFileDir &&
+          !FileFactory.isFileExist(lockFileDir)) {
+        FileFactory.mkdirs(lockFileDir, FileFactory.getFileType(lockFileDir));
+      }
+      if (!FileFactory.isFileExist(lockFilePath)) {
+        FileFactory.createNewLockFile(lockFilePath, FileFactory.getFileType(lockFilePath));
       }
-      dataOutputStream =
-          FileFactory.getDataOutputStreamUsingAppend(location, FileFactory.getFileType(location));
+      dataOutputStream = FileFactory.getDataOutputStreamUsingAppend(lockFilePath,
+          FileFactory.getFileType(lockFilePath));
 
       return true;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
index 75ea074..a40f8de 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
@@ -25,9 +25,9 @@ import java.nio.channels.OverlappingFileLockException;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 /**
  * This class handles the file locking in the local file system.
@@ -35,9 +35,14 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
  */
 public class LocalFileLock extends AbstractCarbonLock {
   /**
-   * location is the location of the lock file.
+   * lockFilePath is the location of the lock file.
    */
-  private String location;
+  private String lockFilePath;
+
+  /**
+   * lockFileDir is the directory of the lock file.
+   */
+  private String lockFileDir;
 
   /**
    * fileOutputStream of the local lock file
@@ -55,27 +60,18 @@ public class LocalFileLock extends AbstractCarbonLock {
   private FileLock fileLock;
 
   /**
-   * lock file
-   */
-  private String lockFile;
-
-  private  String lockFilePath;
-
-  /**
    * LOGGER for  logging the messages.
    */
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(LocalFileLock.class.getName());
 
-
-
   /**
    * @param lockFileLocation
    * @param lockFile
    */
   public LocalFileLock(String lockFileLocation, String lockFile) {
-    this.location = lockFileLocation;
-    this.lockFile = lockFile;
+    this.lockFileDir = CarbonTablePath.getLockFilesDirPath(lockFileLocation);
+    this.lockFilePath = CarbonTablePath.getLockFilePath(lockFileLocation, lockFile);
     initRetry();
   }
 
@@ -95,13 +91,11 @@ public class LocalFileLock extends AbstractCarbonLock {
    */
   @Override public boolean lock() {
     try {
-      if (!FileFactory.isFileExist(location, FileFactory.getFileType(location))) {
-        FileFactory.mkdirs(location, FileFactory.getFileType(location));
+      if (!FileFactory.isFileExist(lockFileDir)) {
+        FileFactory.mkdirs(lockFileDir, FileFactory.getFileType(lockFileDir));
       }
-      lockFilePath = location + CarbonCommonConstants.FILE_SEPARATOR +
-          lockFile;
-      if (!FileFactory.isFileExist(lockFilePath, FileFactory.getFileType(location))) {
-        FileFactory.createNewLockFile(lockFilePath, FileFactory.getFileType(location));
+      if (!FileFactory.isFileExist(lockFilePath)) {
+        FileFactory.createNewLockFile(lockFilePath, FileFactory.getFileType(lockFilePath));
       }
 
       fileOutputStream = new FileOutputStream(lockFilePath);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java b/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java
index 1de5004..5a055ab 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -88,12 +89,12 @@ public class ZooKeeperLocking extends AbstractCarbonLock {
    */
   public ZooKeeperLocking(String lockLocation, String lockFile) {
     this.lockName = lockFile;
-    this.tableIdFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + lockLocation;
+    this.tableIdFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR
+        + CarbonTablePath.getLockFilesDirPath(lockLocation);
 
     initialize();
 
-    this.lockTypeFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + lockLocation
-        + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
+    this.lockTypeFolder = tableIdFolder + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
     try {
       createBaseNode();
       // if exists returns null then path doesnt exist. so creating.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 667c45c..a2ace69 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1370,4 +1370,24 @@ public final class CarbonProperties {
     return thresholdSize;
   }
 
+  /**
+   * Get the number of hours the segment lock files will be preserved.
+   * It will be converted to microseconds to return.
+   */
+  public long getSegmentLockFilesPreserveHours() {
+    long preserveSeconds;
+    try {
+      int preserveHours = Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS,
+              CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT));
+      preserveSeconds = preserveHours * 3600 * 1000L;
+    } catch (NumberFormatException exc) {
+      LOGGER.error(
+          "The segment lock files preserv hours is invalid. Using the default value "
+              + CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT);
+      preserveSeconds = Integer.parseInt(
+          CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT) * 3600 * 1000L;
+    }
+    return preserveSeconds;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index cb264c4..5584e45 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.locks.LockUsage;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 
@@ -44,6 +45,7 @@ public class CarbonTablePath extends Path {
   private static final String PARTITION_PREFIX = "Part";
   private static final String DATA_PART_PREFIX = "part-";
   private static final String BATCH_PREFIX = "_batchno";
+  private static final String LOCK_DIR = "LockFiles";
 
   public static final String CARBON_DATA_EXT = ".carbondata";
   public static final String INDEX_FILE_EXT = ".carbonindex";
@@ -763,4 +765,32 @@ public class CarbonTablePath extends Path {
     return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + "segments";
   }
 
-}
\ No newline at end of file
+  /**
+   * Get the lock files directory
+   */
+  public static String getLockFilesDirPath(String tablePath) {
+    return tablePath + CarbonCommonConstants.FILE_SEPARATOR + LOCK_DIR;
+  }
+
+  /**
+   * Get the lock file
+   */
+  public static String getLockFilePath(String tablePath, String lockType) {
+    return getLockFilesDirPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + lockType;
+  }
+
+  /**
+   * Get the segment lock file according to table path and segment load name
+   */
+  public static String getSegmentLockFilePath(String tablePath, String loadName) {
+    return getLockFilesDirPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR +
+        addSegmentPrefix(loadName) + LockUsage.LOCK;
+  }
+
+  /**
+   * return true if this lock file is a segment lock file otherwise false.
+   */
+  public static boolean isSegmentLockFilePath(String lockFileName) {
+    return lockFileName.startsWith(SEGMENT_PREFIX) && lockFileName.endsWith(LockUsage.LOCK);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index 6767ef7..a60e593 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -437,6 +437,8 @@ object DataLoadingUtil {
         }
       }
     }
+    // delete the expired segment lock files
+    CarbonLockUtil.deleteExpiredSegmentLockFiles(carbonTable)
   }
 
   private def isUpdationRequired(isForceDeletion: Boolean,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6eb64714/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 8ebd5a9..c59b1a3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverte
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
@@ -112,8 +113,7 @@ object AlterTableUtil {
       tablePath: String): Unit = {
     val lockLocation = tablePath
     locks.zip(locksAcquired).foreach { case (carbonLock, lockType) =>
-      val lockFilePath = lockLocation + CarbonCommonConstants.FILE_SEPARATOR +
-                         lockType
+      val lockFilePath = CarbonTablePath.getLockFilePath(lockLocation, lockType)
       if (carbonLock.releaseLockManually(lockFilePath)) {
         LOGGER.info(s"Alter table lock released successfully: ${ lockType }")
       } else {