You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/07/20 10:13:52 UTC

[24/50] [abbrv] incubator-carbondata git commit: [Bug] Concurrency for dictionary generation (#806)

[Bug] Concurrency for dictionary generation (#806)

Supported single commit point for dictionary, sort index and dictionary metadata files.
New sort index file is created every time and cleared on query max time, to allow parallel read and write.

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/b466c10f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/b466c10f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/b466c10f

Branch: refs/heads/master
Commit: b466c10f31876ac4160904a6d1078709dd2a2611
Parents: 9821bee
Author: ashokblend <as...@gmail.com>
Authored: Mon Jul 18 09:43:10 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Mon Jul 18 09:43:10 2016 +0530

----------------------------------------------------------------------
 .../core/carbon/path/CarbonTablePath.java       |  26 +++
 .../core/locks/CarbonLockFactory.java           |  15 +-
 .../org/carbondata/core/locks/HdfsFileLock.java |  28 +--
 .../carbondata/core/locks/LocalFileLock.java    |  55 +++---
 .../org/carbondata/core/locks/LockUsage.java    |   9 +-
 .../carbondata/core/locks/ZooKeeperLocking.java |  27 ++-
 .../CarbonDictionaryMetadataReaderImpl.java     |  20 ++-
 .../core/reader/CarbonDictionaryReaderImpl.java |  24 +--
 .../CarbonDictionarySortIndexReaderImpl.java    |  19 +-
 .../core/writer/CarbonDictionaryWriter.java     |   3 +
 .../core/writer/CarbonDictionaryWriterImpl.java |   9 +-
 .../core/writer/CarbonFooterWriter.java         |  11 +-
 .../CarbonDictionarySortIndexWriterImpl.java    |  56 +++++-
 .../CarbonDictionarySortInfoPreparator.java     |  55 ++++--
 .../dictionary/AbstractDictionaryCacheTest.java |   1 +
 .../core/locks/LocalFileLockTest.java           |   7 +-
 .../core/locks/ZooKeeperLockingTest.java        |   7 +-
 .../writer/CarbonDictionaryWriterImplTest.java  |  15 +-
 .../hadoop/test/util/StoreCreator.java          |   5 +-
 .../execution/command/carbonTableSchema.scala   |  26 +--
 .../spark/rdd/CarbonDataRDDFactory.scala        |  16 +-
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   | 108 +++++++----
 .../spark/tasks/DictionaryWriterTask.scala      | 114 ++++++++++++
 .../spark/tasks/SortIndexWriterTask.scala       |  62 +++++++
 .../spark/util/GlobalDictionaryUtil.scala       |  81 ---------
 .../datacompaction/DataCompactionLockTest.scala |   2 +-
 ...GlobalDictionaryUtilConcurrentTestCase.scala | 177 +++++++++++++++++++
 .../lcm/status/SegmentStatusManager.java        |  17 +-
 28 files changed, 724 insertions(+), 271 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/core/src/main/java/org/carbondata/core/carbon/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/carbon/path/CarbonTablePath.java b/core/src/main/java/org/carbondata/core/carbon/path/CarbonTablePath.java
index 4696927..9ac8802 100644
--- a/core/src/main/java/org/carbondata/core/carbon/path/CarbonTablePath.java
+++ b/core/src/main/java/org/carbondata/core/carbon/path/CarbonTablePath.java
@@ -152,6 +152,16 @@ public class CarbonTablePath extends Path {
   }
 
   /**
+   *
+   * @param columnId
+   * @param dictOffset
+   * @return absolute path of sortindex with appeneded dictionary offset
+   */
+  public String getSortIndexFilePath(String columnId, long dictOffset) {
+    return getMetaDataDir() + File.separator + columnId + "_" + dictOffset + SORT_INDEX_EXT;
+  }
+
+  /**
    * @return absolute path of schema file
    */
   public String getSchemaFilePath() {
@@ -380,4 +390,20 @@ public class CarbonTablePath extends Path {
       return INVALID_SEGMENT_ID;
     }
   }
+
+  /**
+   * Below method will be used to get sort index file present in mentioned folder
+   *
+   * @param sortIndexDir directory where sort index file resides
+   * @param columnUniqueId   columnunique id
+   * @return sort index carbon files
+   */
+  public CarbonFile[] getSortIndexFiles(CarbonFile sortIndexDir, final String columnUniqueId) {
+    CarbonFile[] files = sortIndexDir.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+        return file.getName().startsWith(columnUniqueId) && file.getName().endsWith(SORT_INDEX_EXT);
+      }
+    });
+    return files;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/core/src/main/java/org/carbondata/core/locks/CarbonLockFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/locks/CarbonLockFactory.java b/core/src/main/java/org/carbondata/core/locks/CarbonLockFactory.java
index 99f4ab1..dcf2c2c 100644
--- a/core/src/main/java/org/carbondata/core/locks/CarbonLockFactory.java
+++ b/core/src/main/java/org/carbondata/core/locks/CarbonLockFactory.java
@@ -18,6 +18,7 @@
  */
 package org.carbondata.core.locks;
 
+import org.carbondata.core.carbon.CarbonTableIdentifier;
 import org.carbondata.core.constants.CarbonCommonConstants;
 import org.carbondata.core.util.CarbonProperties;
 
@@ -39,25 +40,25 @@ public class CarbonLockFactory {
   /**
    * This method will determine the lock type.
    *
-   * @param location
-   * @param lockUsage
+   * @param tableIdentifier
+   * @param lockFile
    * @return
    */
-  public static ICarbonLock getCarbonLockObj(String location, LockUsage lockUsage) {
+  public static ICarbonLock getCarbonLockObj(CarbonTableIdentifier tableIdentifier,
+      String lockFile) {
     switch (lockTypeConfigured.toUpperCase()) {
       case CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL:
-        return new LocalFileLock(location, lockUsage);
+        return new LocalFileLock(tableIdentifier, lockFile);
 
       case CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER:
-        return new ZooKeeperLocking(location, lockUsage);
+        return new ZooKeeperLocking(tableIdentifier, lockFile);
 
       case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS:
-        return new HdfsFileLock(location, lockUsage);
+        return new HdfsFileLock(tableIdentifier, lockFile);
 
       default:
         throw new UnsupportedOperationException("Not supported the lock type");
     }
-
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/core/src/main/java/org/carbondata/core/locks/HdfsFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/locks/HdfsFileLock.java b/core/src/main/java/org/carbondata/core/locks/HdfsFileLock.java
index 745dfa1..e1fdd73 100644
--- a/core/src/main/java/org/carbondata/core/locks/HdfsFileLock.java
+++ b/core/src/main/java/org/carbondata/core/locks/HdfsFileLock.java
@@ -19,9 +19,10 @@
 package org.carbondata.core.locks;
 
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 
+import org.carbondata.core.carbon.CarbonTableIdentifier;
+import org.carbondata.core.constants.CarbonCommonConstants;
 import org.carbondata.core.datastorage.store.impl.FileFactory;
 
 /**
@@ -35,22 +36,23 @@ public class HdfsFileLock extends AbstractCarbonLock {
    */
   private String location;
 
-  /**
-   * lockUsage is used to determine the type of the lock. according to this the lock
-   * folder will change.
-   */
-  private LockUsage lockUsage;
-
   private DataOutputStream dataOutputStream;
 
+  public static String tmpPath;
+
+  static {
+    tmpPath = System.getProperty("hadoop.tmp.dir");
+  }
+
   /**
-   * @param location
-   * @param lockUsage
+   * @param tableIdentifier
+   * @param lockFile
    */
-  public HdfsFileLock(String location, LockUsage lockUsage) {
-    this.location = location;
-    this.lockUsage = lockUsage;
-    this.location = location + File.separator + this.lockUsage;
+  public HdfsFileLock(CarbonTableIdentifier tableIdentifier, String lockFile) {
+    this.location =
+        tmpPath + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier.getDatabaseName()
+            + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier.getTableName()
+            + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
     initRetry();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/core/src/main/java/org/carbondata/core/locks/LocalFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/locks/LocalFileLock.java b/core/src/main/java/org/carbondata/core/locks/LocalFileLock.java
index 2b26106..8194bfd 100644
--- a/core/src/main/java/org/carbondata/core/locks/LocalFileLock.java
+++ b/core/src/main/java/org/carbondata/core/locks/LocalFileLock.java
@@ -18,7 +18,6 @@
  */
 package org.carbondata.core.locks;
 
-import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
@@ -27,6 +26,8 @@ import java.nio.channels.OverlappingFileLockException;
 
 import org.carbondata.common.logging.LogService;
 import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.carbon.CarbonTableIdentifier;
+import org.carbondata.core.constants.CarbonCommonConstants;
 import org.carbondata.core.datastorage.store.impl.FileFactory;
 
 /**
@@ -40,12 +41,6 @@ public class LocalFileLock extends AbstractCarbonLock {
   private String location;
 
   /**
-   * lockUsage will determine the lock folder. so that similar locks will try to acquire
-   * same lock file.
-   */
-  private LockUsage lockUsage;
-
-  /**
    * fileOutputStream of the local lock file
    */
   private FileOutputStream fileOutputStream;
@@ -60,11 +55,12 @@ public class LocalFileLock extends AbstractCarbonLock {
    */
   private FileLock fileLock;
 
-  public static final String tmpPath;
-
-  private String cubeName;
+  /**
+   * lock file
+   */
+  private String lockFile;
 
-  private String schemaName;
+  public static final String tmpPath;
 
   /**
    * LOGGER for  logging the messages.
@@ -77,19 +73,14 @@ public class LocalFileLock extends AbstractCarbonLock {
   }
 
   /**
-   * @param location
-   * @param lockUsage
+   * @param tableIdentifier
+   * @param lockFile
    */
-  public LocalFileLock(String location, LockUsage lockUsage) {
-    this.lockUsage = lockUsage;
-    location = location.replace("\\", "/");
-    String tempStr = location.substring(0, location.lastIndexOf('/'));
-    cubeName = tempStr.substring(tempStr.lastIndexOf('/') + 1, tempStr.length());
-    tempStr = tempStr.substring(0, tempStr.lastIndexOf('/'));
-    schemaName = tempStr.substring(tempStr.lastIndexOf('/') + 1, tempStr.length());
+  public LocalFileLock(CarbonTableIdentifier tableIdentifier, String lockFile) {
     this.location =
-        tmpPath + File.separator + schemaName + File.separator + cubeName + File.separator
-            + this.lockUsage;
+        tmpPath + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier.getDatabaseName()
+            + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier.getTableName();
+    this.lockFile = lockFile;
     initRetry();
   }
 
@@ -100,22 +91,16 @@ public class LocalFileLock extends AbstractCarbonLock {
    */
   @Override public boolean lock() {
     try {
-      String schemaFolderPath = tmpPath + File.separator + schemaName;
-      String cubeFolderPath = schemaFolderPath + File.separator + cubeName;
-      // create dir with schema name in tmp location.
-      if (!FileFactory.isFileExist(schemaFolderPath, FileFactory.getFileType(tmpPath))) {
-        FileFactory.mkdirs(schemaFolderPath, FileFactory.getFileType(tmpPath));
-      }
-
-      // create dir with cube name in tmp location.
-      if (!FileFactory.isFileExist(cubeFolderPath, FileFactory.getFileType(tmpPath))) {
-        FileFactory.mkdirs(cubeFolderPath, FileFactory.getFileType(tmpPath));
+      if (!FileFactory.isFileExist(location, FileFactory.getFileType(tmpPath))) {
+        FileFactory.mkdirs(location, FileFactory.getFileType(tmpPath));
       }
-      if (!FileFactory.isFileExist(location, FileFactory.getFileType(location))) {
-        FileFactory.createNewLockFile(location, FileFactory.getFileType(location));
+      String lockFilePath = location + CarbonCommonConstants.FILE_SEPARATOR +
+          lockFile;
+      if (!FileFactory.isFileExist(lockFilePath, FileFactory.getFileType(location))) {
+        FileFactory.createNewLockFile(lockFilePath, FileFactory.getFileType(location));
       }
 
-      fileOutputStream = new FileOutputStream(location);
+      fileOutputStream = new FileOutputStream(lockFilePath);
       channel = fileOutputStream.getChannel();
       try {
         fileLock = channel.tryLock();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/core/src/main/java/org/carbondata/core/locks/LockUsage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/locks/LockUsage.java b/core/src/main/java/org/carbondata/core/locks/LockUsage.java
index f07cfb8..1c70a17 100644
--- a/core/src/main/java/org/carbondata/core/locks/LockUsage.java
+++ b/core/src/main/java/org/carbondata/core/locks/LockUsage.java
@@ -22,9 +22,10 @@ package org.carbondata.core.locks;
  * This enum is used to define the usecase of the lock.
  * Each enum value is one specific lock case.
  */
-public enum LockUsage {
-  METADATA_LOCK,
-  COMPACTION_LOCK,
-  TABLE_STATUS_LOCK;
+public class LockUsage {
+  public static String LOCK = ".lock";
+  public static String METADATA_LOCK="meta.lock";
+  public static String COMPACTION_LOCK="compaction.lock";
+  public static String TABLE_STATUS_LOCK="tablestatus.lock";
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/core/src/main/java/org/carbondata/core/locks/ZooKeeperLocking.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/locks/ZooKeeperLocking.java b/core/src/main/java/org/carbondata/core/locks/ZooKeeperLocking.java
index 8b04ac1..47349ad 100644
--- a/core/src/main/java/org/carbondata/core/locks/ZooKeeperLocking.java
+++ b/core/src/main/java/org/carbondata/core/locks/ZooKeeperLocking.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.carbondata.common.logging.LogService;
 import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.carbon.CarbonTableIdentifier;
 import org.carbondata.core.constants.CarbonCommonConstants;
 
 import org.apache.zookeeper.CreateMode;
@@ -68,25 +69,21 @@ public class ZooKeeperLocking extends AbstractCarbonLock {
   private String lockTypeFolder;
 
   /**
-   * @param lockUsage
+   * @param tableIdentifier
+   * @param lockFile
    */
-  public ZooKeeperLocking(String location, LockUsage lockUsage) {
-    this.lockName = CarbonCommonConstants.ZOOKEEPER_LOCK;
-    this.lockTypeFolder = zooKeeperLocation;
-    location = location.replace("\\", "/");
-    String tempStr = location.substring(0, location.lastIndexOf('/'));
-    String schemaName = tempStr.substring(tempStr.lastIndexOf('/') + 1, tempStr.length());
-
-    String cubeName = location.substring(location.lastIndexOf('/') + 1, location.length());
-
-    this.tableIdFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + schemaName
-        + '.' + cubeName;
+  public ZooKeeperLocking(CarbonTableIdentifier tableIdentifier, String lockFile) {
+    this.lockName = lockFile;
+    this.tableIdFolder =
+        zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier.getDatabaseName()
+            + '.' + tableIdentifier.getTableName();
 
     zk = ZookeeperInit.getInstance().getZookeeper();
 
-    this.lockTypeFolder = zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + schemaName
-        + '.' + cubeName + CarbonCommonConstants.FILE_SEPARATOR
-        + lockUsage.toString();
+    this.lockTypeFolder =
+        zooKeeperLocation + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier.getDatabaseName()
+            + '.' + tableIdentifier.getTableName() + CarbonCommonConstants.FILE_SEPARATOR
+            + lockFile;
     try {
       createBaseNode();
       // if exists returns null then path doesnt exist. so creating.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/core/src/main/java/org/carbondata/core/reader/CarbonDictionaryMetadataReaderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/reader/CarbonDictionaryMetadataReaderImpl.java b/core/src/main/java/org/carbondata/core/reader/CarbonDictionaryMetadataReaderImpl.java
index 512ca8d..a43fc22 100644
--- a/core/src/main/java/org/carbondata/core/reader/CarbonDictionaryMetadataReaderImpl.java
+++ b/core/src/main/java/org/carbondata/core/reader/CarbonDictionaryMetadataReaderImpl.java
@@ -146,6 +146,7 @@ public class CarbonDictionaryMetadataReaderImpl implements CarbonDictionaryMetad
   @Override public void close() throws IOException {
     if (null != dictionaryMetadataFileReader) {
       dictionaryMetadataFileReader.close();
+      dictionaryMetadataFileReader = null;
     }
   }
 
@@ -167,14 +168,17 @@ public class CarbonDictionaryMetadataReaderImpl implements CarbonDictionaryMetad
   private void openThriftReader() throws IOException {
     // initialise dictionary file reader which will return dictionary thrift object
     // dictionary thrift object contains a list of byte buffer
-    dictionaryMetadataFileReader =
-        new ThriftReader(this.columnDictionaryMetadataFilePath, new ThriftReader.TBaseCreator() {
-          @Override public TBase create() {
-            return new ColumnDictionaryChunkMeta();
-          }
-        });
-    // Open it
-    dictionaryMetadataFileReader.open();
+    if (null == dictionaryMetadataFileReader) {
+      dictionaryMetadataFileReader =
+          new ThriftReader(this.columnDictionaryMetadataFilePath, new ThriftReader.TBaseCreator() {
+            @Override public TBase create() {
+              return new ColumnDictionaryChunkMeta();
+            }
+          });
+      // Open it
+      dictionaryMetadataFileReader.open();
+    }
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/core/src/main/java/org/carbondata/core/reader/CarbonDictionaryReaderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/reader/CarbonDictionaryReaderImpl.java b/core/src/main/java/org/carbondata/core/reader/CarbonDictionaryReaderImpl.java
index 1313535..5e5207a 100644
--- a/core/src/main/java/org/carbondata/core/reader/CarbonDictionaryReaderImpl.java
+++ b/core/src/main/java/org/carbondata/core/reader/CarbonDictionaryReaderImpl.java
@@ -144,6 +144,7 @@ public class CarbonDictionaryReaderImpl implements CarbonDictionaryReader {
   @Override public void close() throws IOException {
     if (null != dictionaryFileReader) {
       dictionaryFileReader.close();
+      dictionaryFileReader = null;
     }
   }
 
@@ -294,15 +295,18 @@ public class CarbonDictionaryReaderImpl implements CarbonDictionaryReader {
    * @throws IOException thrift reader open method throws IOException
    */
   private void openThriftReader() throws IOException {
-    // initialise dictionary file reader which will return dictionary thrift object
-    // dictionary thrift object contains a list of byte buffer
-    dictionaryFileReader =
-        new ThriftReader(this.columnDictionaryFilePath, new ThriftReader.TBaseCreator() {
-          @Override public TBase create() {
-            return new ColumnDictionaryChunk();
-          }
-        });
-    // Open dictionary file reader
-    dictionaryFileReader.open();
+    if (null == dictionaryFileReader) {
+      // initialise dictionary file reader which will return dictionary thrift object
+      // dictionary thrift object contains a list of byte buffer
+      dictionaryFileReader =
+          new ThriftReader(this.columnDictionaryFilePath, new ThriftReader.TBaseCreator() {
+            @Override public TBase create() {
+              return new ColumnDictionaryChunk();
+            }
+          });
+      // Open dictionary file reader
+      dictionaryFileReader.open();
+    }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/core/src/main/java/org/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java b/core/src/main/java/org/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java
index 99431ed..337a857 100644
--- a/core/src/main/java/org/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java
+++ b/core/src/main/java/org/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImpl.java
@@ -27,7 +27,9 @@ import org.carbondata.core.carbon.CarbonTableIdentifier;
 import org.carbondata.core.carbon.ColumnIdentifier;
 import org.carbondata.core.carbon.path.CarbonStorePath;
 import org.carbondata.core.carbon.path.CarbonTablePath;
+import org.carbondata.core.datastorage.store.impl.FileFactory;
 import org.carbondata.core.reader.ThriftReader;
+import org.carbondata.core.util.CarbonUtil;
 import org.carbondata.format.ColumnSortInfo;
 
 import org.apache.thrift.TBase;
@@ -149,8 +151,21 @@ public class CarbonDictionarySortIndexReaderImpl implements CarbonDictionarySort
 
   protected void initPath() {
     CarbonTablePath carbonTablePath =
-         CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier);
-    this.sortIndexFilePath = carbonTablePath.getSortIndexFilePath(columnIdentifier.getColumnId());
+        CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier);
+    String dictionaryPath = carbonTablePath.getDictionaryFilePath(columnIdentifier.getColumnId());
+    long dictOffset = CarbonUtil.getFileSize(dictionaryPath);
+    this.sortIndexFilePath =
+        carbonTablePath.getSortIndexFilePath(columnIdentifier.getColumnId(), dictOffset);
+    try {
+      if (!FileFactory
+          .isFileExist(this.sortIndexFilePath, FileFactory.getFileType(this.sortIndexFilePath))) {
+        this.sortIndexFilePath =
+            carbonTablePath.getSortIndexFilePath(columnIdentifier.getColumnId());
+      }
+    } catch (IOException e) {
+      this.sortIndexFilePath = carbonTablePath.getSortIndexFilePath(columnIdentifier.getColumnId());
+    }
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/core/src/main/java/org/carbondata/core/writer/CarbonDictionaryWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/writer/CarbonDictionaryWriter.java b/core/src/main/java/org/carbondata/core/writer/CarbonDictionaryWriter.java
index 8eef295..e13c32f 100644
--- a/core/src/main/java/org/carbondata/core/writer/CarbonDictionaryWriter.java
+++ b/core/src/main/java/org/carbondata/core/writer/CarbonDictionaryWriter.java
@@ -57,4 +57,7 @@ public interface CarbonDictionaryWriter extends Closeable {
    * @throws IOException if an I/O error occurs
    */
   void write(List<byte[]> valueList) throws IOException;
+
+
+  void commit() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/core/src/main/java/org/carbondata/core/writer/CarbonDictionaryWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/writer/CarbonDictionaryWriterImpl.java b/core/src/main/java/org/carbondata/core/writer/CarbonDictionaryWriterImpl.java
index 474b746..f174f70 100644
--- a/core/src/main/java/org/carbondata/core/writer/CarbonDictionaryWriterImpl.java
+++ b/core/src/main/java/org/carbondata/core/writer/CarbonDictionaryWriterImpl.java
@@ -203,8 +203,6 @@ public class CarbonDictionaryWriterImpl implements CarbonDictionaryWriter {
       writeDictionaryFile();
       // close the thrift writer for dictionary file
       closeThriftWriter();
-      this.chunk_end_offset = CarbonUtil.getFileSize(this.dictionaryFilePath);
-      writeDictionaryMetadataFile();
     }
   }
 
@@ -410,4 +408,11 @@ public class CarbonDictionaryWriterImpl implements CarbonDictionaryWriter {
     return new CarbonDictionaryMetadataReaderImpl(hdfsStorePath, carbonTableIdentifier,
         columnIdentifier);
   }
+
+  @Override public void commit() throws IOException {
+    if (null != dictionaryThriftWriter) {
+      this.chunk_end_offset = CarbonUtil.getFileSize(this.dictionaryFilePath);
+      writeDictionaryMetadataFile();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/core/src/main/java/org/carbondata/core/writer/CarbonFooterWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/writer/CarbonFooterWriter.java b/core/src/main/java/org/carbondata/core/writer/CarbonFooterWriter.java
index 058825d..17b5686 100644
--- a/core/src/main/java/org/carbondata/core/writer/CarbonFooterWriter.java
+++ b/core/src/main/java/org/carbondata/core/writer/CarbonFooterWriter.java
@@ -48,9 +48,14 @@ public class CarbonFooterWriter {
 
     ThriftWriter thriftWriter = openThriftWriter(filePath);
     footer.setVersion(VERSION_NUMBER);
-    thriftWriter.write(footer);
-    thriftWriter.writeOffset(currentPosition);
-    thriftWriter.close();
+    try {
+      thriftWriter.write(footer);
+      thriftWriter.writeOffset(currentPosition);
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      thriftWriter.close();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/core/src/main/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImpl.java b/core/src/main/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImpl.java
index 1a9553e..89fba44 100644
--- a/core/src/main/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImpl.java
+++ b/core/src/main/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImpl.java
@@ -19,14 +19,20 @@
 package org.carbondata.core.writer.sortindex;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 
+import org.carbondata.common.ext.PathFactory;
 import org.carbondata.common.logging.LogService;
 import org.carbondata.common.logging.LogServiceFactory;
 import org.carbondata.core.carbon.CarbonTableIdentifier;
 import org.carbondata.core.carbon.ColumnIdentifier;
-import org.carbondata.core.carbon.path.CarbonStorePath;
 import org.carbondata.core.carbon.path.CarbonTablePath;
+import org.carbondata.core.constants.CarbonCommonConstants;
+import org.carbondata.core.datastorage.store.filesystem.CarbonFile;
+import org.carbondata.core.datastorage.store.impl.FileFactory;
+import org.carbondata.core.util.CarbonProperties;
 import org.carbondata.core.util.CarbonUtil;
 import org.carbondata.core.writer.ThriftWriter;
 import org.carbondata.format.ColumnSortInfo;
@@ -144,9 +150,51 @@ public class CarbonDictionarySortIndexWriterImpl implements CarbonDictionarySort
   }
 
   protected void initPath() {
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier);
-    this.sortIndexFilePath = carbonTablePath.getSortIndexFilePath(columnIdentifier.getColumnId());
+    CarbonTablePath carbonTablePath = PathFactory.getInstance()
+        .getCarbonTablePath(columnIdentifier, carbonStorePath, carbonTableIdentifier);
+    String dictionaryPath = carbonTablePath.getDictionaryFilePath(columnIdentifier.getColumnId());
+    long dictOffset = CarbonUtil.getFileSize(dictionaryPath);
+    this.sortIndexFilePath =
+        carbonTablePath.getSortIndexFilePath(columnIdentifier.getColumnId(), dictOffset);
+    cleanUpOldSortIndex(carbonTablePath);
+  }
+
+  /**
+   * It cleans up old unused sortindex file
+   *
+   * @param carbonTablePath
+   */
+  private void cleanUpOldSortIndex(CarbonTablePath carbonTablePath) {
+    CarbonFile sortIndexFile =
+        FileFactory.getCarbonFile(sortIndexFilePath, FileFactory.getFileType(sortIndexFilePath));
+    CarbonFile[] files =
+        carbonTablePath.getSortIndexFiles(sortIndexFile.getParentFile(),
+            columnIdentifier.getColumnId());
+    int maxTime;
+    try {
+      maxTime = Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME));
+    } catch (NumberFormatException e) {
+      maxTime = CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME;
+    }
+    if (null != files) {
+      Arrays.sort(files, new Comparator<CarbonFile>() {
+        @Override public int compare(CarbonFile o1, CarbonFile o2) {
+          return o1.getName().compareTo(o2.getName());
+        }
+      });
+      for (int i = 0; i < files.length - 1; i++) {
+        long difference = System.currentTimeMillis() - files[i].getLastModifiedTime();
+        long minutesElapsed = (difference / (1000 * 60));
+        if (minutesElapsed > maxTime) {
+          if (!files[i].delete()) {
+            LOGGER.warn("Failed to delete sortindex file." + files[i].getAbsolutePath());
+          } else {
+            LOGGER.info("Sort index file is deleted." + files[i].getAbsolutePath());
+          }
+        }
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/core/src/main/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortInfoPreparator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortInfoPreparator.java b/core/src/main/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortInfoPreparator.java
index a5f43d3..595d12c 100644
--- a/core/src/main/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortInfoPreparator.java
+++ b/core/src/main/java/org/carbondata/core/writer/sortindex/CarbonDictionarySortInfoPreparator.java
@@ -20,6 +20,7 @@ package org.carbondata.core.writer.sortindex;
 
 import java.nio.charset.Charset;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 
 import org.carbondata.core.cache.dictionary.Dictionary;
@@ -39,14 +40,16 @@ public class CarbonDictionarySortInfoPreparator {
   /**
    * The method returns the column Sort Info
    *
-   * @param dataType DataType of columns
+   * @param newDistinctValues new distinct value to be added
+   * @param dictionary        old distinct values
+   * @param dataType          DataType of columns
    * @return CarbonDictionarySortInfo returns the column Sort Info
    * @throws CarbonUtilException
    */
-  public CarbonDictionarySortInfo getDictionarySortInfo(Dictionary dictionary, DataType dataType)
-      throws CarbonUtilException {
+  public CarbonDictionarySortInfo getDictionarySortInfo(List<String> newDistinctValues,
+      Dictionary dictionary, DataType dataType) throws CarbonUtilException {
     CarbonDictionarySortModel[] dictionarySortModels =
-        prepareDictionarySortModels(dictionary, dataType);
+        prepareDictionarySortModels(newDistinctValues, dictionary, dataType);
     return createColumnSortInfo(dictionarySortModels);
   }
 
@@ -95,6 +98,7 @@ public class CarbonDictionarySortInfoPreparator {
   /**
    * The method returns the array of CarbonDictionarySortModel
    *
+   * @param distinctValues new distinct values
    * @param dictionary The wrapper wraps the list<list<bye[]>> and provide the
    *                   iterator to retrieve the chunks members.
    * @param dataType   DataType of columns
@@ -102,22 +106,45 @@ public class CarbonDictionarySortInfoPreparator {
    * CarbonDictionarySortModel contains the  member's surrogate and
    * its byte value
    */
-  private CarbonDictionarySortModel[] prepareDictionarySortModels(Dictionary dictionary,
-      DataType dataType) {
+  private CarbonDictionarySortModel[] prepareDictionarySortModels(List<String> distinctValues,
+      Dictionary dictionary, DataType dataType) {
     CarbonDictionarySortModel[] dictionarySortModels = null;
     //The wrapper wraps the list<list<bye[]>> and provide the iterator to
     // retrieve the chunks members.
-    DictionaryChunksWrapper dictionaryChunksWrapper = dictionary.getDictionaryChunks();
-    dictionarySortModels = new CarbonDictionarySortModel[dictionaryChunksWrapper.getSize()];
     int surrogate = 1;
-    while (dictionaryChunksWrapper.hasNext()) {
-      String memberValue = new String(dictionaryChunksWrapper.next(),
-          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
-      CarbonDictionarySortModel dictionarySortModel =
-          new CarbonDictionarySortModel(surrogate, dataType, memberValue);
-      dictionarySortModels[surrogate - 1] = dictionarySortModel;
+    if (null != dictionary) {
+      DictionaryChunksWrapper dictionaryChunksWrapper = dictionary.getDictionaryChunks();
+      dictionarySortModels =
+          new CarbonDictionarySortModel[dictionaryChunksWrapper.getSize() + distinctValues.size()];
+      while (dictionaryChunksWrapper.hasNext()) {
+        dictionarySortModels[surrogate - 1] =
+            createDictionarySortModel(surrogate, dataType, dictionaryChunksWrapper.next());
+        surrogate++;
+      }
+    } else {
+      dictionarySortModels = new CarbonDictionarySortModel[distinctValues.size()];
+    }
+    // for new distinct values
+    Iterator<String> distinctValue = distinctValues.iterator();
+    while (distinctValue.hasNext()) {
+      dictionarySortModels[surrogate - 1] =
+          createDictionarySortModel(surrogate, dataType, distinctValue.next().getBytes());
       surrogate++;
     }
     return dictionarySortModels;
   }
+
+  /**
+   *
+   * @param surrogate
+   * @param dataType
+   * @param value member value
+   * @return CarbonDictionarySortModel
+   */
+  private CarbonDictionarySortModel createDictionarySortModel(int surrogate, DataType dataType,
+      byte[] value) {
+    String memberValue = new String(value, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+    return new CarbonDictionarySortModel(surrogate, dataType, memberValue);
+  }
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/core/src/test/java/org/carbondata/core/cache/dictionary/AbstractDictionaryCacheTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/carbondata/core/cache/dictionary/AbstractDictionaryCacheTest.java b/core/src/test/java/org/carbondata/core/cache/dictionary/AbstractDictionaryCacheTest.java
index 3a23d0d..48d3d29 100644
--- a/core/src/test/java/org/carbondata/core/cache/dictionary/AbstractDictionaryCacheTest.java
+++ b/core/src/test/java/org/carbondata/core/cache/dictionary/AbstractDictionaryCacheTest.java
@@ -121,6 +121,7 @@ public class AbstractDictionaryCacheTest {
       carbonDictionaryWriter.write(valueList);
     } finally {
       carbonDictionaryWriter.close();
+      carbonDictionaryWriter.commit();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/core/src/test/java/org/carbondata/core/locks/LocalFileLockTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/carbondata/core/locks/LocalFileLockTest.java b/core/src/test/java/org/carbondata/core/locks/LocalFileLockTest.java
index cfdc50e..8e9c17f 100644
--- a/core/src/test/java/org/carbondata/core/locks/LocalFileLockTest.java
+++ b/core/src/test/java/org/carbondata/core/locks/LocalFileLockTest.java
@@ -5,6 +5,8 @@ package org.carbondata.core.locks;
 
 import java.io.File;
 
+import org.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.carbondata.core.carbon.CarbonTableIdentifier;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -31,12 +33,13 @@ public class LocalFileLockTest {
 
   @Test public void testingLocalFileLockingByAcquiring2Locks() {
 
+	CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier("databaseName", "tableName", "tableId");
     LocalFileLock localLock1 =
-        new LocalFileLock((new File(".").getAbsolutePath()) + "/src/test/resources",
+        new LocalFileLock(carbonTableIdentifier,
             LockUsage.METADATA_LOCK);
     Assert.assertTrue(localLock1.lock());
     LocalFileLock localLock2 =
-        new LocalFileLock((new File(".").getAbsolutePath()) + "/src/test/resources",
+        new LocalFileLock(carbonTableIdentifier,
             LockUsage.METADATA_LOCK);
     Assert.assertTrue(!localLock2.lock());
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/core/src/test/java/org/carbondata/core/locks/ZooKeeperLockingTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/carbondata/core/locks/ZooKeeperLockingTest.java b/core/src/test/java/org/carbondata/core/locks/ZooKeeperLockingTest.java
index 3bf3993..41910a3 100644
--- a/core/src/test/java/org/carbondata/core/locks/ZooKeeperLockingTest.java
+++ b/core/src/test/java/org/carbondata/core/locks/ZooKeeperLockingTest.java
@@ -8,9 +8,11 @@ import java.io.IOException;
 import java.net.ServerSocket;
 import java.util.Properties;
 
+import org.carbondata.core.carbon.CarbonTableIdentifier;
 import org.carbondata.core.util.CarbonProperties;
 
 import mockit.NonStrictExpectations;
+
 import org.apache.zookeeper.server.ServerConfig;
 import org.apache.zookeeper.server.ZooKeeperServerMain;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
@@ -78,13 +80,14 @@ public class ZooKeeperLockingTest {
 
     ZookeeperInit zki = ZookeeperInit.getInstance("127.0.0.1:" + freePort);
 
+    CarbonTableIdentifier tableIdentifier = new CarbonTableIdentifier("dbName", "tableName", "tableId");
     ZooKeeperLocking zkl =
-        new ZooKeeperLocking("D:/carbondata/examples/target/store/default/t3/Metadata",
+        new ZooKeeperLocking(tableIdentifier,
             LockUsage.METADATA_LOCK);
     Assert.assertTrue(zkl.lock());
 
     ZooKeeperLocking zk2 = new ZooKeeperLocking(
-        "D:/carbondata/examples/target/store/default/t3/Metadata", LockUsage.METADATA_LOCK);
+    		tableIdentifier, LockUsage.METADATA_LOCK);
     Assert.assertTrue(!zk2.lock());
 
     Assert.assertTrue(zkl.unlock());

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/core/src/test/java/org/carbondata/core/writer/CarbonDictionaryWriterImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/carbondata/core/writer/CarbonDictionaryWriterImplTest.java b/core/src/test/java/org/carbondata/core/writer/CarbonDictionaryWriterImplTest.java
index 31822d1..66ad9c7 100644
--- a/core/src/test/java/org/carbondata/core/writer/CarbonDictionaryWriterImplTest.java
+++ b/core/src/test/java/org/carbondata/core/writer/CarbonDictionaryWriterImplTest.java
@@ -245,9 +245,15 @@ public class CarbonDictionaryWriterImplTest {
   private void overwriteDictionaryMetaFile(ColumnDictionaryChunkMeta firstDictionaryChunkMeta,
       String dictionaryFile) throws IOException {
     ThriftWriter thriftMetaChunkWriter = new ThriftWriter(dictionaryFile, false);
-    thriftMetaChunkWriter.open();
-    thriftMetaChunkWriter.write(firstDictionaryChunkMeta);
-    thriftMetaChunkWriter.close();
+    try {
+      thriftMetaChunkWriter.open();
+      thriftMetaChunkWriter.write(firstDictionaryChunkMeta);
+    } catch (IOException e) {
+
+    } finally {
+      thriftMetaChunkWriter.close();
+    }
+
   }
 
   /**
@@ -345,6 +351,7 @@ public class CarbonDictionaryWriterImplTest {
       }
     } finally {
       writer.close();
+      writer.commit();
     }
   }
 
@@ -361,6 +368,8 @@ public class CarbonDictionaryWriterImplTest {
     writer.write(convertStringListToByteArray(dataSet1));
     // close the writer
     writer.close();
+    //write metadata
+    writer.commit();
     // record end offset of file
     long end_offset = CarbonUtil.getFileSize(this.dictionaryFilePath);
     // read dictionary chunk from dictionary file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/hadoop/src/test/java/org/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/carbondata/hadoop/test/util/StoreCreator.java
index ad21250..c419c27 100644
--- a/hadoop/src/test/java/org/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/carbondata/hadoop/test/util/StoreCreator.java
@@ -298,14 +298,15 @@ public class StoreCreator {
         writer.write(value);
       }
       writer.close();
-
+      writer.commit();
       Dictionary dict = (Dictionary) dictCache.get(
           new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier.getCarbonTableIdentifier(),
         		  columnIdentifier, dims.get(i).getDataType()));
       CarbonDictionarySortInfoPreparator preparator =
           new CarbonDictionarySortInfoPreparator();
+      List<String> newDistinctValues = new ArrayList<String>();
       CarbonDictionarySortInfo dictionarySortInfo =
-          preparator.getDictionarySortInfo(dict, dims.get(i).getDataType());
+          preparator.getDictionarySortInfo(newDistinctValues, dict, dims.get(i).getDataType());
       CarbonDictionarySortIndexWriter carbonDictionaryWriter =
           new CarbonDictionarySortIndexWriterImpl(
               absoluteTableIdentifier.getCarbonTableIdentifier(), columnIdentifier,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 3da0ff1..82e9921 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -1464,10 +1464,19 @@ private[sql] case class LoadCube(
       LOGGER.audit(s"Data loading failed. table not found: $schemaName.$tableName")
       sys.error("Data loading failed. table not found: " + schemaName + "." + tableName)
     }
+
+    val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
+        .lookupRelation1(Option(schemaName), tableName, None)(sqlContext)
+        .asInstanceOf[CarbonRelation]
+    if (relation == null) {
+        sys.error(s"Table $schemaName.$tableName does not exist")
+    }
     CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
-    val carbonLock = CarbonLockFactory.getCarbonLockObj(org.carbondata.core.
-      carbon.metadata.CarbonMetadata.getInstance().getCarbonTable(schemaName + "_" + tableName).
-      getMetaDataFilepath, LockUsage.METADATA_LOCK)
+    val carbonLock = CarbonLockFactory
+      .getCarbonLockObj(relation.cubeMeta.carbonTable.getAbsoluteTableIdentifier
+        .getCarbonTableIdentifier,
+        LockUsage.METADATA_LOCK
+      )
     try {
       if (carbonLock.lockWithRetries()) {
         logInfo("Successfully able to get the table metadata file lock")
@@ -1477,13 +1486,6 @@ private[sql] case class LoadCube(
       }
 
       val factPath = FileUtils.getPaths(CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser))
-      val relation =
-        CarbonEnv.getInstance(sqlContext).carbonCatalog
-          .lookupRelation1(Option(schemaName), tableName, None)(sqlContext)
-          .asInstanceOf[CarbonRelation]
-      if (relation == null) {
-        sys.error(s"Table $schemaName.$tableName does not exist")
-      }
       val carbonLoadModel = new CarbonLoadModel()
       carbonLoadModel.setTableName(relation.cubeMeta.carbonTableIdentifier.getTableName)
       carbonLoadModel.setDatabaseName(relation.cubeMeta.carbonTableIdentifier.getDatabaseName)
@@ -1836,7 +1838,9 @@ private[sql] case class DropCubeCommand(ifExistsSet: Boolean, schemaNameOp: Opti
     } else {
       CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
       val carbonLock = CarbonLockFactory
-        .getCarbonLockObj(tmpTable .getMetaDataFilepath, LockUsage.METADATA_LOCK)
+        .getCarbonLockObj(tmpTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+          LockUsage.METADATA_LOCK
+        )
       try {
         if (carbonLock.lockWithRetries()) {
           logInfo("Successfully able to get the table metadata file lock")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 2cd4e45..12149c5 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -185,7 +185,9 @@ object CarbonDataRDDFactory extends Logging {
 
       // Save the load metadata
       val carbonLock = CarbonLockFactory
-        .getCarbonLockObj(cube.getMetaDataFilepath, LockUsage.METADATA_LOCK)
+        .getCarbonLockObj(cube.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+          LockUsage.METADATA_LOCK
+        )
       try {
         if (carbonLock.lockWithRetries()) {
           logInfo("Successfully got the table metadata file lock")
@@ -325,7 +327,9 @@ object CarbonDataRDDFactory extends Logging {
     )
 
     val lock = CarbonLockFactory
-      .getCarbonLockObj(carbonTable.getMetaDataFilepath, LockUsage.COMPACTION_LOCK)
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+        LockUsage.COMPACTION_LOCK
+      )
 
     if (lock.lockWithRetries()) {
       logger
@@ -523,7 +527,9 @@ object CarbonDataRDDFactory extends Logging {
           cubeCreationTime
         )
         val lock = CarbonLockFactory
-          .getCarbonLockObj(carbonTable.getMetaDataFilepath, LockUsage.COMPACTION_LOCK)
+          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+            LockUsage.COMPACTION_LOCK
+          )
 
         var storeLocation = ""
         var configuredStore = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
@@ -928,7 +934,9 @@ object CarbonDataRDDFactory extends Logging {
       currentRestructNumber = 0
     }
     val carbonLock = CarbonLockFactory
-      .getCarbonLockObj(cube.getMetaDataFilepath, LockUsage.METADATA_LOCK)
+      .getCarbonLockObj(cube.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+        LockUsage.METADATA_LOCK
+      )
     try {
       if (carbonLock.lockWithRetries()) {
         deleteLoadsAndUpdateMetadata(carbonLoadModel,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index b734257..4a90f08 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -37,9 +37,14 @@ import org.carbondata.core.carbon.metadata.encoder.Encoding
 import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
 import org.carbondata.core.constants.CarbonCommonConstants
 import org.carbondata.core.datastorage.store.impl.FileFactory
+import org.carbondata.core.locks.CarbonLockFactory
+import org.carbondata.core.locks.LockUsage
 import org.carbondata.core.util.CarbonTimeStatisticsFactory
+import org.carbondata.processing.etl.DataLoadingException
 import org.carbondata.spark.load.{CarbonLoaderUtil, CarbonLoadModel}
 import org.carbondata.spark.partition.reader.{CSVParser, CSVReader}
+import org.carbondata.spark.tasks.DictionaryWriterTask
+import org.carbondata.spark.tasks.SortIndexWriterTask
 import org.carbondata.spark.util.GlobalDictionaryUtil
 import org.carbondata.spark.util.GlobalDictionaryUtil._
 
@@ -63,6 +68,8 @@ trait GenericParser {
   def parseString(input: String): Unit
 }
 
+case class DictionaryStats(distinctValues: java.util.List[String],
+    dictWriteTime: Long, sortIndexWriteTime: Long)
 case class PrimitiveParser(dimension: CarbonDimension,
     setOpt: Option[HashSet[String]]) extends GenericParser {
   val (hasDictEncoding, set: HashSet[String]) = setOpt match {
@@ -237,19 +244,11 @@ class CarbonGlobalDictionaryGenerateRDD(
       var dictionaryForDistinctValueLookUp: org.carbondata.core.cache.dictionary.Dictionary = _
       var dictionaryForSortIndexWriting: org.carbondata.core.cache.dictionary.Dictionary = _
       var dictionaryForDistinctValueLookUpCleared: Boolean = false
+      val dictLock = CarbonLockFactory.getCarbonLockObj(model.table,
+        model.columnIdentifier(split.index).getColumnId + LockUsage.LOCK)
       // generate distinct value list
       try {
         val t1 = System.currentTimeMillis
-        dictionaryForDistinctValueLookUp = if (model.dictFileExists(split.index)) {
-          CarbonLoaderUtil.getDictionary(model.table,
-            model.columnIdentifier(split.index),
-            model.hdfsLocation,
-            model.primDimensions(split.index).getDataType
-          )
-        } else {
-          null
-        }
-        val t2 = System.currentTimeMillis
         val valuesBuffer = new mutable.HashSet[String]
         val rddIter = firstParent[(Int, ColumnDistinctValues)].iterator(split, context)
         var rowCount = 0L
@@ -273,44 +272,64 @@ class CarbonGlobalDictionaryGenerateRDD(
             }
           }
         }
-
+        val combineListTime = (System.currentTimeMillis() - t1)
         if (isHighCardinalityColumn) {
           LOGGER.info("column " + model.table.getTableUniqueName + "." +
-            model.primDimensions(split.index).getColName + " is high cardinality column")
+                      model.primDimensions(split.index).getColName + " is high cardinality column")
         } else {
-          val t3 = System.currentTimeMillis
-          val distinctValueCount = GlobalDictionaryUtil.generateAndWriteNewDistinctValueList(
-            valuesBuffer, dictionaryForDistinctValueLookUp, model, split.index)
+          if (dictLock.lockWithRetries()) {
+            logInfo(s"Successfully able to get the dictionary lock for ${
+              model.primDimensions(split.index).getColName
+            }")
+          } else {
+            sys
+              .error(s"Dictionary file ${
+                model.primDimensions(split.index).getColName
+              } is locked for updation. Please try after some time")
+          }
+          val t2 = System.currentTimeMillis
+          dictionaryForDistinctValueLookUp = if (model.dictFileExists(split.index)) {
+            CarbonLoaderUtil.getDictionary(model.table,
+              model.columnIdentifier(split.index),
+              model.hdfsLocation,
+              model.primDimensions(split.index).getDataType
+            )
+          } else {
+            null
+          }
+          val dictCacheTime = (System.currentTimeMillis - t2)
+          val t3 = System.currentTimeMillis()
+          val dictWriteTask = new DictionaryWriterTask(valuesBuffer,
+            dictionaryForDistinctValueLookUp,
+            model,
+            split.index)
+          // execute dictionary writer task to get distinct values
+          val distinctValues = dictWriteTask.execute()
+          val dictWriteTime = (System.currentTimeMillis() - t3)
+          val t4 = System.currentTimeMillis()
+          // if new data came than rewrite sort index file
+          if (distinctValues.size() > 0) {
+            val sortIndexWriteTask = new SortIndexWriterTask(model,
+              split.index,
+              dictionaryForDistinctValueLookUp,
+              distinctValues)
+            sortIndexWriteTask.execute()
+          }
+          val sortIndexWriteTime = (System.currentTimeMillis() - t4)
+          // After sortIndex writing, update dictionaryMeta
+          dictWriteTask.updateMetaData()
           // clear the value buffer after writing dictionary data
           valuesBuffer.clear
           org.carbondata.core.util.CarbonUtil
             .clearDictionaryCache(dictionaryForDistinctValueLookUp);
           dictionaryForDistinctValueLookUpCleared = true
-          val t4 = System.currentTimeMillis
-          if (distinctValueCount > 0) {
-            dictionaryForSortIndexWriting = CarbonLoaderUtil.getDictionary(model.table,
-              model.columnIdentifier(split.index),
-              model.hdfsLocation,
-              model.primDimensions(split.index).getDataType)
-            GlobalDictionaryUtil.writeGlobalDictionaryColumnSortInfo(model, split.index,
-              dictionaryForSortIndexWriting)
-            val t5 = System.currentTimeMillis
-            val dicWriteStartTime = t4
-            CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordCsvlDicShuffleMaxTime(
-              dicWriteStartTime - dicShuffleStartTime)
-            val dicWriteEndTime = System.currentTimeMillis()
-            CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordDicWriteFileMaxTime(
-              dicWriteEndTime - dicWriteStartTime)
-            CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordGlobalDicGenTotalTime(
-              dicWriteEndTime)
-            LOGGER.info("\n columnName:" + model.primDimensions(split.index).getColName +
+          LOGGER.info("\n columnName:" + model.primDimensions(split.index).getColName +
               "\n columnId:" + model.primDimensions(split.index).getColumnId +
-              "\n new distinct values count:" + distinctValueCount +
-              "\n create dictionary cache:" + (t2 - t1) +
-              "\n combine lists:" + (t3 - t2) +
-              "\n sort list, distinct and write:" + (t4 - t3) +
-              "\n write sort info:" + (t5 - t4))
-          }
+              "\n new distinct values count:" + distinctValues.size() +
+              "\n combine lists:" + combineListTime +
+              "\n create dictionary cache:" + dictCacheTime +
+              "\n sort list, distinct and write:" + dictWriteTime +
+              "\n write sort info:" + sortIndexWriteTime)
         }
       } catch {
         case ex: Exception =>
@@ -322,6 +341,17 @@ class CarbonGlobalDictionaryGenerateRDD(
             .clearDictionaryCache(dictionaryForDistinctValueLookUp);
         }
         org.carbondata.core.util.CarbonUtil.clearDictionaryCache(dictionaryForSortIndexWriting);
+        if (dictLock != null) {
+          if (dictLock.unlock()) {
+            logInfo(s"Dictionary ${
+              model.primDimensions(split.index).getColName
+            } Unlocked Successfully.")
+          } else {
+            logError(s"Unable to unlock Dictionary ${
+              model.primDimensions(split.index).getColName
+            }")
+          }
+        }
       }
       var finished = false
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/integration/spark/src/main/scala/org/carbondata/spark/tasks/DictionaryWriterTask.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/tasks/DictionaryWriterTask.scala b/integration/spark/src/main/scala/org/carbondata/spark/tasks/DictionaryWriterTask.scala
new file mode 100644
index 0000000..c526cda
--- /dev/null
+++ b/integration/spark/src/main/scala/org/carbondata/spark/tasks/DictionaryWriterTask.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.carbondata.spark.tasks
+
+import java.io.IOException
+
+import scala.collection.mutable
+
+import org.carbondata.common.factory.CarbonCommonFactory
+import org.carbondata.core.cache.dictionary.Dictionary
+import org.carbondata.core.constants.CarbonCommonConstants
+import org.carbondata.core.writer.CarbonDictionaryWriter
+import org.carbondata.spark.rdd.DictionaryLoadModel
+
+/**
+ *
+ * @param valuesBuffer
+ * @param dictionary
+ * @param model
+ * @param columnIndex
+ * @param writer
+ */
+class DictionaryWriterTask(valuesBuffer: mutable.HashSet[String],
+    dictionary: Dictionary,
+    model: DictionaryLoadModel, columnIndex: Int,
+    var writer: CarbonDictionaryWriter = null) {
+
+  /**
+   * execute the task
+   *
+   * @return distinctValueList and time taken to write
+   */
+  def execute(): java.util.List[String] = {
+    val values = valuesBuffer.toArray
+    java.util.Arrays.sort(values, Ordering[String])
+    val dictService = CarbonCommonFactory.getDictionaryService
+    writer = dictService.getDictionaryWriter(
+      model.table,
+      model.columnIdentifier(columnIndex),
+      model.hdfsLocation)
+    val distinctValues: java.util.List[String] = new java.util.ArrayList()
+
+    try {
+      if (!model.dictFileExists(columnIndex)) {
+        writer.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
+        distinctValues.add(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
+      }
+
+      if (values.length >= 1) {
+        var preValue = values(0)
+        if (model.dictFileExists(columnIndex)) {
+          if (dictionary.getSurrogateKey(values(0)) == CarbonCommonConstants
+            .INVALID_SURROGATE_KEY) {
+            writer.write(values(0))
+            distinctValues.add(values(0))
+          }
+          for (i <- 1 until values.length) {
+            if (preValue != values(i)) {
+              if (dictionary.getSurrogateKey(values(i)) ==
+                  CarbonCommonConstants.INVALID_SURROGATE_KEY) {
+                writer.write(values(i))
+                distinctValues.add(values(i))
+                preValue = values(i)
+              }
+            }
+          }
+
+        } else {
+          writer.write(values(0))
+          distinctValues.add(values(0))
+          for (i <- 1 until values.length) {
+            if (preValue != values(i)) {
+              writer.write(values(i))
+              distinctValues.add(values(i))
+              preValue = values(i)
+            }
+          }
+        }
+      }
+    } catch {
+      case ex: IOException =>
+        throw ex
+    }
+    finally {
+      if (null != writer) {
+        writer.close()
+      }
+    }
+    distinctValues
+  }
+
+  /**
+   * update dictionary metadata
+   */
+  def updateMetaData() {
+    if (null != writer) {
+      writer.commit()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/integration/spark/src/main/scala/org/carbondata/spark/tasks/SortIndexWriterTask.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/tasks/SortIndexWriterTask.scala b/integration/spark/src/main/scala/org/carbondata/spark/tasks/SortIndexWriterTask.scala
new file mode 100644
index 0000000..debdbd5
--- /dev/null
+++ b/integration/spark/src/main/scala/org/carbondata/spark/tasks/SortIndexWriterTask.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.carbondata.spark.tasks
+
+import org.carbondata.common.factory.CarbonCommonFactory
+import org.carbondata.core.cache.dictionary.Dictionary
+import org.carbondata.core.writer.sortindex.{CarbonDictionarySortIndexWriter,
+CarbonDictionarySortInfo, CarbonDictionarySortInfoPreparator}
+import org.carbondata.spark.load.CarbonLoaderUtil
+import org.carbondata.spark.rdd.DictionaryLoadModel
+import org.carbondata.spark.rdd.DictionaryStats
+
+/**
+ * This task writes sort index file
+ *
+ * @param model
+ * @param index
+ * @param dictionary
+ * @param dictWriterTask
+ * @param carbonDictionarySortIndexWriter
+ */
+class SortIndexWriterTask(model: DictionaryLoadModel,
+    index: Int,
+    dictionary: Dictionary,
+    distinctValues: java.util.List[String],
+    var carbonDictionarySortIndexWriter: CarbonDictionarySortIndexWriter = null) {
+  def execute() {
+    try {
+      if (distinctValues.size() > 0) {
+        val preparator: CarbonDictionarySortInfoPreparator = new CarbonDictionarySortInfoPreparator
+        val dictService = CarbonCommonFactory.getDictionaryService
+        val dictionarySortInfo: CarbonDictionarySortInfo =
+          preparator.getDictionarySortInfo(distinctValues, dictionary,
+            model.primDimensions(index).getDataType)
+        carbonDictionarySortIndexWriter =
+          dictService.getDictionarySortIndexWriter(model.table, model.columnIdentifier(index),
+            model.hdfsLocation)
+        carbonDictionarySortIndexWriter.writeSortIndex(dictionarySortInfo.getSortIndex)
+        carbonDictionarySortIndexWriter
+          .writeInvertedSortIndex(dictionarySortInfo.getSortIndexInverted)
+      }
+    } finally {
+      if (null != carbonDictionarySortIndexWriter) {
+        carbonDictionarySortIndexWriter.close()
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
index 56423e7..49d3d5b 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -167,33 +167,6 @@ object GlobalDictionaryUtil extends Logging {
   }
 
   /**
-   * invokes the CarbonDictionarySortIndexWriter to write column sort info
-   * sortIndex and sortIndexInverted data to sortinsex file.
-   *
-   * @param model  carbon Dictionary Model
-   * @param index  index for write
-   * @param dictionary  dictionary has already been generated
-   */
-  def writeGlobalDictionaryColumnSortInfo(model: DictionaryLoadModel,
-      index: Int,
-      dictionary: Dictionary): Unit = {
-    val preparator: CarbonDictionarySortInfoPreparator = new CarbonDictionarySortInfoPreparator
-    val dictService = CarbonCommonFactory.getDictionaryService
-    val dictionarySortInfo: CarbonDictionarySortInfo =
-      preparator.getDictionarySortInfo(dictionary,
-        model.primDimensions(index).getDataType)
-    val carbonDictionaryWriter: CarbonDictionarySortIndexWriter =
-      dictService.getDictionarySortIndexWriter(model.table, model.columnIdentifier(index),
-          model.hdfsLocation)
-    try {
-      carbonDictionaryWriter.writeSortIndex(dictionarySortInfo.getSortIndex)
-      carbonDictionaryWriter.writeInvertedSortIndex(dictionarySortInfo.getSortIndexInverted)
-    } finally {
-      carbonDictionaryWriter.close()
-    }
-  }
-
-  /**
    * read global dictionary from cache
    */
   def readGlobalDictionaryFromCache(model: DictionaryLoadModel): HashMap[String, Dictionary] = {
@@ -650,58 +623,4 @@ object GlobalDictionaryUtil extends Logging {
         throw ex
     }
   }
-
-  def generateAndWriteNewDistinctValueList(valuesBuffer: mutable.HashSet[String],
-      dictionary: Dictionary,
-      model: DictionaryLoadModel, columnIndex: Int): Int = {
-    val values = valuesBuffer.toArray
-    java.util.Arrays.sort(values, Ordering[String])
-    var distinctValueCount: Int = 0
-    val dictService = CarbonCommonFactory.getDictionaryService
-    val writer: CarbonDictionaryWriter = dictService.getDictionaryWriter(
-        model.table,
-        model.columnIdentifier(columnIndex),
-        model.hdfsLocation)
-    try {
-      if (!model.dictFileExists(columnIndex)) {
-        writer.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
-        distinctValueCount += 1
-      }
-
-      if (values.length >= 1) {
-        var preValue = values(0)
-        if (model.dictFileExists(columnIndex)) {
-          if (dictionary.getSurrogateKey(values(0)) == CarbonCommonConstants
-            .INVALID_SURROGATE_KEY) {
-            writer.write(values(0))
-            distinctValueCount += 1
-          }
-          for (i <- 1 until values.length) {
-            if (preValue != values(i)) {
-              if (dictionary.getSurrogateKey(values(i)) ==
-                  CarbonCommonConstants.INVALID_SURROGATE_KEY) {
-                writer.write(values(i))
-                preValue = values(i)
-                distinctValueCount += 1
-              }
-            }
-          }
-
-        } else {
-          writer.write(values(0))
-          distinctValueCount += 1
-          for (i <- 1 until values.length) {
-            if (preValue != values(i)) {
-              writer.write(values(i))
-              preValue = values(i)
-              distinctValueCount += 1
-            }
-          }
-        }
-      }
-    } finally {
-      writer.close()
-    }
-    distinctValueCount
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
index e121214..f66612e 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
@@ -32,7 +32,7 @@ class DataCompactionLockTest extends QueryTest with BeforeAndAfterAll {
   val dataPath: String = carbonTablePath.getMetadataDirectoryPath
 
   val carbonLock: ICarbonLock =
-    CarbonLockFactory.getCarbonLockObj(dataPath, LockUsage.TABLE_STATUS_LOCK)
+    CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier.getCarbonTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
 
 
   override def beforeAll {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/integration/spark/src/test/scala/org/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala b/integration/spark/src/test/scala/org/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
new file mode 100644
index 0000000..60e9281
--- /dev/null
+++ b/integration/spark/src/test/scala/org/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.spark.util
+
+import java.io.File
+
+import org.apache.spark.sql.{ CarbonEnv, CarbonRelation }
+import org.apache.spark.sql.common.util.CarbonHiveContext
+import org.apache.spark.sql.common.util.CarbonHiveContext.sql
+import org.apache.spark.sql.common.util.QueryTest
+
+import org.carbondata.core.carbon.{ CarbonDataLoadSchema }
+import org.carbondata.spark.load.CarbonLoadModel
+
+import org.scalatest.BeforeAndAfterAll
+import org.carbondata.core.datastorage.store.impl.FileFactory
+import scala.collection.mutable.ListBuffer
+import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executors
+import java.util.concurrent.Future
+import java.util.concurrent.FutureTask
+import java.util.concurrent.Callable
+import java.util.concurrent.TimeUnit
+import org.carbondata.core.carbon.path.CarbonTablePath
+import org.carbondata.common.ext.PathFactory
+import org.carbondata.core.carbon.ColumnIdentifier
+import org.carbondata.core.util.CarbonProperties
+import org.carbondata.core.constants.CarbonCommonConstants
+
+class GlobalDictionaryUtilConcurrentTestCase extends QueryTest with BeforeAndAfterAll {
+
+  var sampleRelation: CarbonRelation = _
+  var workDirectory: String = _
+
+  def buildCarbonLoadModel(relation: CarbonRelation,
+                           filePath: String,
+                           dimensionFilePath: String,
+                           header: String): CarbonLoadModel = {
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setTableName(relation.cubeMeta.carbonTableIdentifier.getDatabaseName)
+    carbonLoadModel.setDatabaseName(relation.cubeMeta.carbonTableIdentifier.getTableName)
+    // carbonLoadModel.setSchema(relation.cubeMeta.schema)
+    val table = relation.cubeMeta.carbonTable
+    val carbonSchema = new CarbonDataLoadSchema(table)
+    carbonLoadModel.setDatabaseName(table.getDatabaseName)
+    carbonLoadModel.setTableName(table.getFactTableName)
+    carbonLoadModel.setCarbonDataLoadSchema(carbonSchema)
+    carbonLoadModel.setFactFilePath(filePath)
+    carbonLoadModel.setDimFolderPath(dimensionFilePath)
+    carbonLoadModel.setCsvHeader(header)
+    carbonLoadModel.setCsvDelimiter(",")
+    carbonLoadModel.setComplexDelimiterLevel1("\\$")
+    carbonLoadModel.setComplexDelimiterLevel2("\\:")
+    carbonLoadModel.setStorePath(relation.cubeMeta.storePath)
+    carbonLoadModel
+  }
+
+  override def beforeAll {
+    buildTestData
+    // second time comment this line
+    buildTable
+    buildRelation
+  }
+
+  def buildTestData() = {
+    workDirectory = new File(this.getClass.getResource("/").getPath + "/../../").getCanonicalPath.replace("\\", "/")
+  }
+  def buildTable() = {
+    try {
+      sql(
+        "CREATE TABLE IF NOT EXISTS employee (empid STRING) STORED BY 'org.apache.carbondata.format'")
+    } catch {
+      case ex: Throwable => logError(ex.getMessage + "\r\n" + ex.getStackTraceString)
+    }
+  }
+
+  def buildRelation() = {
+    val catalog = CarbonEnv.getInstance(CarbonHiveContext).carbonCatalog
+    sampleRelation = catalog.lookupRelation1(Option("default"), "employee", None)(CarbonHiveContext)
+      .asInstanceOf[CarbonRelation]
+  }
+  def writedummydata(filePath: String, recCount: Int) = {
+    var a: Int = 0
+    var records: StringBuilder = StringBuilder.newBuilder
+    for (a <- 0 to recCount) {
+      records.append(a).append("\n")
+    }
+    val dis = FileFactory.getDataOutputStream(filePath, FileFactory.getFileType(filePath))
+    dis.writeBytes(records.toString())
+    dis.close()
+  }
+  test("concurrent dictionary generation") {
+    CarbonProperties.getInstance.addProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME, "-1")
+    val noOfFiles = 5
+    val files = new ListBuffer[String]()
+    val loadModels = new ListBuffer[CarbonLoadModel]()
+    for (i <- 0 until noOfFiles) {
+      val filePath: String = workDirectory + s"/src/test/resources/singlecolumn_${10 * (i + 1)}.csv"
+      files += filePath
+      loadModels += buildCarbonLoadModel(sampleRelation, filePath, null, "empid")
+      writedummydata(filePath, 10 * (i + 1))
+    }
+    try {
+      val dictGenerators = new java.util.ArrayList[Callable[String]](noOfFiles)
+      for (i <- 0 until noOfFiles) {
+        dictGenerators.add(new DictGenerator(loadModels(i)))
+      }
+      val executorService = Executors.newFixedThreadPool(10);
+      val results = executorService.invokeAll(dictGenerators);
+      for (i <- 0 until noOfFiles) {
+        val res = results.get(i).get
+        assert("Pass".equals(res))
+      }
+    } catch {
+      case ex: Exception =>
+        ex.printStackTrace()
+        assert(false)
+    }
+    val carbonTableIdentifier = sampleRelation.cubeMeta.carbonTable.getCarbonTableIdentifier
+    val columnIdentifier = sampleRelation.cubeMeta.carbonTable.getDimensionByName("employee", "empid").getColumnIdentifier
+    val carbonTablePath = PathFactory.getInstance()
+        .getCarbonTablePath(columnIdentifier, sampleRelation.cubeMeta.storePath, carbonTableIdentifier);
+    val dictPath = carbonTablePath.getDictionaryFilePath(columnIdentifier.getColumnId)
+    val dictFile = FileFactory.getCarbonFile(dictPath, FileFactory.getFileType(dictPath))
+    val offSet = dictFile.getSize
+    val sortIndexPath = carbonTablePath.getSortIndexFilePath(columnIdentifier.getColumnId, offSet)
+    val sortIndexFile = FileFactory.getCarbonFile(sortIndexPath, FileFactory.getFileType(sortIndexPath))
+    assert(sortIndexFile.exists())
+    val sortIndexFiles = carbonTablePath.getSortIndexFiles(sortIndexFile.getParentFile, columnIdentifier.getColumnId)
+    assert(sortIndexFiles.length == 2)
+    deleteFiles(files)
+  }
+
+  def deleteFiles(files: ListBuffer[String]) {
+    for (i <- 0 until files.length) {
+      val file = FileFactory.getCarbonFile(files(i), FileFactory.getFileType(files(i)))
+      file.delete()
+    }
+  }
+  override def afterAll {
+    sql("drop table if exists employee")
+    CarbonProperties.getInstance.addProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME,
+        Integer.toString(CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME))
+  }
+  class DictGenerator(loadModel: CarbonLoadModel) extends Callable[String] {
+   override def call:String = {
+     var result = "Pass"
+      try {
+        GlobalDictionaryUtil
+          .generateGlobalDictionary(CarbonHiveContext,
+            loadModel,
+            sampleRelation.cubeMeta.storePath)
+      } catch {
+        case ex: Exception => 
+          result = ex.getMessage
+          ex.printStackTrace()
+      }
+      result
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b466c10f/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java b/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
index e8ed31f..d5b06f6 100644
--- a/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
+++ b/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
@@ -74,11 +74,8 @@ public class SegmentStatusManager {
    * @return
    */
   public ICarbonLock getTableStatusLock() {
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-            absoluteTableIdentifier.getCarbonTableIdentifier());
-    String metaDataFilepath = carbonTablePath.getMetadataDirectoryPath();
-    return CarbonLockFactory.getCarbonLockObj(metaDataFilepath, LockUsage.TABLE_STATUS_LOCK);
+    return CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier.getCarbonTableIdentifier(),
+        LockUsage.TABLE_STATUS_LOCK);
   }
 
   /**
@@ -243,8 +240,9 @@ public class SegmentStatusManager {
    * @return
    */
   public List<String> updateDeletionStatus(List<String> loadIds, String cubeFolderPath) {
-    ICarbonLock carbonLock =
-        CarbonLockFactory.getCarbonLockObj(cubeFolderPath, LockUsage.METADATA_LOCK);
+    ICarbonLock carbonLock = CarbonLockFactory
+        .getCarbonLockObj(absoluteTableIdentifier.getCarbonTableIdentifier(),
+            LockUsage.METADATA_LOCK);
     List<String> invalidLoadIds = new ArrayList<String>(0);
     try {
       if (carbonLock.lockWithRetries()) {
@@ -300,8 +298,9 @@ public class SegmentStatusManager {
    */
   public List<String> updateDeletionStatus(String loadDate, String tableFolderPath,
       Long loadStartTime) {
-    ICarbonLock carbonLock =
-        CarbonLockFactory.getCarbonLockObj(tableFolderPath, LockUsage.METADATA_LOCK);
+    ICarbonLock carbonLock = CarbonLockFactory
+        .getCarbonLockObj(absoluteTableIdentifier.getCarbonTableIdentifier(),
+            LockUsage.METADATA_LOCK);
     List<String> invalidLoadTimestamps = new ArrayList<String>(0);
     try {
       if (carbonLock.lockWithRetries()) {