You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/29 07:37:39 UTC

carbondata git commit: [CARBONDATA-1804] Support Plug-gable File Operations based on File types

Repository: carbondata
Updated Branches:
  refs/heads/master 9e0fd5ffe -> 33de599a5


[CARBONDATA-1804] Support Plug-gable File Operations based on File types

Refactor FileFactory based on FileType to support plug-gable file handlers so that custom file handlers can have their specific logic.
Example : User can provide his own implementations by extending existing FileTypes
Refactore FileFactory code : Moved File type opearation code into specific file types instead of checking file type every time in FileFactory. So that custom file operations can be performed on the specific file types by extending required File type

This closes #1560


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/33de599a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/33de599a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/33de599a

Branch: refs/heads/master
Commit: 33de599a56cdd767e5779123a4797178b3c40ae5
Parents: 9e0fd5f
Author: Manohar <ma...@gmail.com>
Authored: Fri Nov 24 12:40:36 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Nov 29 15:37:23 2017 +0800

----------------------------------------------------------------------
 .../filesystem/AbstractDFSCarbonFile.java       | 176 ++++++++-
 .../core/datastore/filesystem/CarbonFile.java   |  47 +++
 .../datastore/filesystem/LocalCarbonFile.java   | 178 +++++++++
 .../core/datastore/impl/DFSFileHolderImpl.java  |   6 +-
 .../datastore/impl/DefaultFileTypeProvider.java |  72 ++++
 .../core/datastore/impl/FileFactory.java        | 373 ++++---------------
 .../core/datastore/impl/FileTypeInerface.java   |  32 ++
 .../carbondata/core/util/CarbonUtilTest.java    |   8 +-
 .../spark/sql/hive/CarbonFileMetastore.scala    |  11 +-
 9 files changed, 588 insertions(+), 315 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/33de599a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index 55b6d64..12466b2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -17,9 +17,11 @@
 
 package org.apache.carbondata.core.datastore.filesystem;
 
+import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -28,9 +30,16 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.GzipCodec;
 
 public abstract  class AbstractDFSCarbonFile implements CarbonFile {
   /**
@@ -39,6 +48,7 @@ public abstract  class AbstractDFSCarbonFile implements CarbonFile {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(AbstractDFSCarbonFile.class.getName());
   protected FileStatus fileStatus;
+  public FileSystem fs;
   protected Configuration hadoopConf;
 
   public AbstractDFSCarbonFile(String filePath) {
@@ -49,7 +59,6 @@ public abstract  class AbstractDFSCarbonFile implements CarbonFile {
     this.hadoopConf = hadoopConf;
     filePath = filePath.replace("\\", "/");
     Path path = new Path(filePath);
-    FileSystem fs;
     try {
       fs = path.getFileSystem(this.hadoopConf);
       fileStatus = fs.getFileStatus(path);
@@ -231,4 +240,169 @@ public abstract  class AbstractDFSCarbonFile implements CarbonFile {
     }
     return isFileModified;
   }
+
+  @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
+      int bufferSize, boolean append) throws IOException {
+    Path pt = new Path(path);
+    FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration());
+    FSDataOutputStream stream = null;
+    if (append) {
+      // append to a file only if file already exists else file not found
+      // exception will be thrown by hdfs
+      if (CarbonUtil.isFileExists(path)) {
+        stream = fs.append(pt, bufferSize);
+      } else {
+        stream = fs.create(pt, true, bufferSize);
+      }
+    } else {
+      stream = fs.create(pt, true, bufferSize);
+    }
+    return stream;
+  }
+
+  @Override public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
+      int bufferSize, Configuration hadoopConf) throws IOException {
+    path = path.replace("\\", "/");
+    boolean gzip = path.endsWith(".gz");
+    boolean bzip2 = path.endsWith(".bz2");
+    InputStream stream;
+    Path pt = new Path(path);
+    FileSystem fs = pt.getFileSystem(hadoopConf);
+    if (bufferSize == -1) {
+      stream = fs.open(pt);
+    } else {
+      stream = fs.open(pt, bufferSize);
+    }
+    String codecName = null;
+    if (gzip) {
+      codecName = GzipCodec.class.getName();
+    } else if (bzip2) {
+      codecName = BZip2Codec.class.getName();
+    }
+    if (null != codecName) {
+      CompressionCodecFactory ccf = new CompressionCodecFactory(hadoopConf);
+      CompressionCodec codec = ccf.getCodecByClassName(codecName);
+      stream = codec.createInputStream(stream);
+    }
+    return new DataInputStream(new BufferedInputStream(stream));
+  }
+
+  /**
+   * return the datainputStream which is seek to the offset of file
+   *
+   * @param path
+   * @param fileType
+   * @param bufferSize
+   * @param offset
+   * @return DataInputStream
+   * @throws IOException
+   */
+  @Override public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
+      int bufferSize, long offset) throws IOException {
+    path = path.replace("\\", "/");
+    Path pt = new Path(path);
+    FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration());
+    FSDataInputStream stream = fs.open(pt, bufferSize);
+    stream.seek(offset);
+    return new DataInputStream(new BufferedInputStream(stream));
+  }
+
+  @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType)
+      throws IOException {
+    path = path.replace("\\", "/");
+    Path pt = new Path(path);
+    FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration());
+    return fs.create(pt, true);
+  }
+
+  @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
+      int bufferSize, long blockSize) throws IOException {
+    path = path.replace("\\", "/");
+    Path pt = new Path(path);
+    FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration());
+    return fs.create(pt, true, bufferSize, fs.getDefaultReplication(pt), blockSize);
+  }
+
+  @Override public boolean isFileExist(String filePath, FileFactory.FileType fileType,
+      boolean performFileCheck) throws IOException {
+    filePath = filePath.replace("\\", "/");
+    Path path = new Path(filePath);
+    FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
+    if (performFileCheck) {
+      return fs.exists(path) && fs.isFile(path);
+    } else {
+      return fs.exists(path);
+    }
+  }
+
+  @Override public boolean isFileExist(String filePath, FileFactory.FileType fileType)
+      throws IOException {
+    filePath = filePath.replace("\\", "/");
+    Path path = new Path(filePath);
+    FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
+    return fs.exists(path);
+  }
+
+  @Override public boolean createNewFile(String filePath, FileFactory.FileType fileType)
+      throws IOException {
+    filePath = filePath.replace("\\", "/");
+    Path path = new Path(filePath);
+    FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
+    return fs.createNewFile(path);
+  }
+
+  @Override
+  public boolean createNewFile(String filePath, FileFactory.FileType fileType, boolean doAs,
+      final FsPermission permission) throws IOException {
+    filePath = filePath.replace("\\", "/");
+    Path path = new Path(filePath);
+    FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
+    boolean result = fs.createNewFile(path);
+    if (null != permission) {
+      fs.setPermission(path, permission);
+    }
+    return result;
+  }
+
+  @Override public boolean deleteFile(String filePath, FileFactory.FileType fileType)
+      throws IOException {
+    filePath = filePath.replace("\\", "/");
+    Path path = new Path(filePath);
+    FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
+    return fs.delete(path, true);
+  }
+
+  @Override public boolean mkdirs(String filePath, FileFactory.FileType fileType)
+      throws IOException {
+    filePath = filePath.replace("\\", "/");
+    Path path = new Path(filePath);
+    FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
+    return fs.mkdirs(path);
+  }
+
+  @Override
+  public DataOutputStream getDataOutputStreamUsingAppend(String path, FileFactory.FileType fileType)
+      throws IOException {
+    path = path.replace("\\", "/");
+    Path pt = new Path(path);
+    FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration());
+    return fs.append(pt);
+  }
+
+  @Override public boolean createNewLockFile(String filePath, FileFactory.FileType fileType)
+      throws IOException {
+    filePath = filePath.replace("\\", "/");
+    Path path = new Path(filePath);
+    FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
+    if (fs.createNewFile(path)) {
+      fs.deleteOnExit(path);
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public void setPermission(String directoryPath, FsPermission permission, String username,
+      String group) throws IOException {
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33de599a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
index 95a084d..94f088b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
@@ -17,6 +17,15 @@
 
 package org.apache.carbondata.core.datastore.filesystem;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+
 public interface CarbonFile {
 
   String getAbsolutePath();
@@ -61,4 +70,42 @@ public interface CarbonFile {
    * @return
    */
   boolean isFileModified(long fileTimeStamp, long endOffset);
+
+  DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType, int bufferSize,
+      boolean append) throws IOException;
+
+  DataInputStream getDataInputStream(String path, FileFactory.FileType fileType, int bufferSize,
+      Configuration configuration) throws IOException;
+
+  DataInputStream getDataInputStream(String path, FileFactory.FileType fileType, int bufferSize,
+      long offset) throws IOException;
+
+  DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType)
+      throws IOException;
+
+  DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType, int bufferSize,
+      long blockSize) throws IOException;
+
+  boolean isFileExist(String filePath, FileFactory.FileType fileType, boolean performFileCheck)
+      throws IOException;
+
+  boolean isFileExist(String filePath, FileFactory.FileType fileType) throws IOException;
+
+  boolean createNewFile(String filePath, FileFactory.FileType fileType) throws IOException;
+
+  boolean createNewFile(String filePath, FileFactory.FileType fileType, boolean doAs,
+      final FsPermission permission) throws IOException;
+
+  boolean deleteFile(String filePath, FileFactory.FileType fileType) throws IOException;
+
+  boolean mkdirs(String filePath, FileFactory.FileType fileType) throws IOException;
+
+  DataOutputStream getDataOutputStreamUsingAppend(String path, FileFactory.FileType fileType)
+      throws IOException;
+
+  boolean createNewLockFile(String filePath, FileFactory.FileType fileType) throws IOException;
+
+  void setPermission(String directoryPath, FsPermission permission, String username, String group)
+      throws IOException;
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33de599a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
index f6e9f8f..5b2cc68 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
@@ -17,12 +17,19 @@
 
 package org.apache.carbondata.core.datastore.filesystem;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.channels.FileChannel;
+import java.util.zip.GZIPInputStream;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -30,7 +37,10 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 
 public class LocalCarbonFile implements CarbonFile {
   private static final LogService LOGGER =
@@ -212,6 +222,7 @@ public class LocalCarbonFile implements CarbonFile {
     return isFileModified;
   }
 
+
   @Override public boolean renameForce(String changetoName) {
     File destFile = new File(changetoName);
     if (destFile.exists()) {
@@ -224,4 +235,171 @@ public class LocalCarbonFile implements CarbonFile {
 
   }
 
+  /**
+   * below method will be used to update the file path
+   * for local type
+   * it removes the file:/ from the path
+   *
+   * @param filePath
+   * @return updated file path without url for local
+   */
+  private static String getUpdatedFilePath(String filePath) {
+    if (filePath != null && !filePath.isEmpty()) {
+      // If the store path is relative then convert to absolute path.
+      if (filePath.startsWith("./")) {
+        try {
+          return new File(filePath).getCanonicalPath();
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      } else {
+        Path pathWithoutSchemeAndAuthority =
+            Path.getPathWithoutSchemeAndAuthority(new Path(filePath));
+        return pathWithoutSchemeAndAuthority.toString();
+      }
+    } else {
+      return filePath;
+    }
+  }
+
+  @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
+      int bufferSize, boolean append) throws FileNotFoundException {
+    path = getUpdatedFilePath(path);
+    return new DataOutputStream(
+        new BufferedOutputStream(new FileOutputStream(path, append), bufferSize));
+  }
+
+  @Override public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
+      int bufferSize, Configuration configuration) throws IOException {
+    path = path.replace("\\", "/");
+    boolean gzip = path.endsWith(".gz");
+    boolean bzip2 = path.endsWith(".bz2");
+    InputStream stream;
+    path = FileFactory.getUpdatedFilePath(path, fileType);
+    if (gzip) {
+      stream = new GZIPInputStream(new FileInputStream(path));
+    } else if (bzip2) {
+      stream = new BZip2CompressorInputStream(new FileInputStream(path));
+    } else {
+      stream = new FileInputStream(path);
+    }
+    return new DataInputStream(new BufferedInputStream(stream));
+  }
+
+  /**
+   * return the datainputStream which is seek to the offset of file
+   *
+   * @param path
+   * @param fileType
+   * @param bufferSize
+   * @param offset
+   * @return DataInputStream
+   * @throws IOException
+   */
+  @Override public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
+      int bufferSize, long offset) throws IOException {
+    path = path.replace("\\", "/");
+    path = FileFactory.getUpdatedFilePath(path, fileType);
+    FileInputStream fis = new FileInputStream(path);
+    long actualSkipSize = 0;
+    long skipSize = offset;
+    try {
+      while (actualSkipSize != offset) {
+        actualSkipSize += fis.skip(skipSize);
+        skipSize = skipSize - actualSkipSize;
+      }
+    } catch (IOException ioe) {
+      CarbonUtil.closeStream(fis);
+      throw ioe;
+    }
+    return new DataInputStream(new BufferedInputStream(fis));
+  }
+
+  @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType)
+      throws IOException {
+    path = path.replace("\\", "/");
+    return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path)));
+  }
+
+  @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
+      int bufferSize, long blockSize) throws IOException {
+    path = path.replace("\\", "/");
+    path = FileFactory.getUpdatedFilePath(path, fileType);
+    return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path), bufferSize));
+  }
+
+  @Override public boolean isFileExist(String filePath, FileFactory.FileType fileType,
+      boolean performFileCheck) throws IOException {
+    filePath = filePath.replace("\\", "/");
+    filePath = FileFactory.getUpdatedFilePath(filePath, fileType);
+    File defaultFile = new File(filePath);
+
+    if (performFileCheck) {
+      return defaultFile.exists() && defaultFile.isFile();
+    } else {
+      return defaultFile.exists();
+    }
+  }
+
+  @Override public boolean isFileExist(String filePath, FileFactory.FileType fileType)
+      throws IOException {
+    filePath = filePath.replace("\\", "/");
+    filePath = FileFactory.getUpdatedFilePath(filePath, fileType);
+    File defaultFile = new File(filePath);
+    return defaultFile.exists();
+  }
+
+  @Override public boolean createNewFile(String filePath, FileFactory.FileType fileType)
+      throws IOException {
+    filePath = filePath.replace("\\", "/");
+    filePath = FileFactory.getUpdatedFilePath(filePath, fileType);
+    File file = new File(filePath);
+    return file.createNewFile();
+  }
+
+  @Override
+  public boolean createNewFile(String filePath, FileFactory.FileType fileType, boolean doAs,
+      final FsPermission permission) throws IOException {
+    filePath = filePath.replace("\\", "/");
+    filePath = FileFactory.getUpdatedFilePath(filePath, fileType);
+    File file = new File(filePath);
+    return file.createNewFile();
+  }
+
+  @Override public boolean deleteFile(String filePath, FileFactory.FileType fileType)
+      throws IOException {
+    filePath = filePath.replace("\\", "/");
+    filePath = FileFactory.getUpdatedFilePath(filePath, fileType);
+    File file = new File(filePath);
+    return FileFactory.deleteAllFilesOfDir(file);
+  }
+
+  @Override public boolean mkdirs(String filePath, FileFactory.FileType fileType)
+      throws IOException {
+    filePath = filePath.replace("\\", "/");
+    filePath = FileFactory.getUpdatedFilePath(filePath, fileType);
+    File file = new File(filePath);
+    return file.mkdirs();
+  }
+
+  @Override
+  public DataOutputStream getDataOutputStreamUsingAppend(String path, FileFactory.FileType fileType)
+      throws IOException {
+    path = path.replace("\\", "/");
+    path = FileFactory.getUpdatedFilePath(path, fileType);
+    return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path, true)));
+  }
+
+  @Override public boolean createNewLockFile(String filePath, FileFactory.FileType fileType)
+      throws IOException {
+    filePath = filePath.replace("\\", "/");
+    filePath = FileFactory.getUpdatedFilePath(filePath, fileType);
+    File file = new File(filePath);
+    return file.createNewFile();
+  }
+
+  @Override
+  public void setPermission(String directoryPath, FsPermission permission, String username,
+      String group) throws IOException {
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33de599a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileHolderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileHolderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileHolderImpl.java
index f787700..3032ec2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileHolderImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DFSFileHolderImpl.java
@@ -59,7 +59,7 @@ public class DFSFileHolderImpl implements FileHolder {
    * @param filePath fully qualified file path
    * @return channel
    */
-  private FSDataInputStream updateCache(String filePath) throws IOException {
+  public FSDataInputStream updateCache(String filePath) throws IOException {
     FSDataInputStream fileChannel = fileNameAndStreamCache.get(filePath);
     if (null == fileChannel) {
       Path pt = new Path(filePath);
@@ -157,4 +157,8 @@ public class DFSFileHolderImpl implements FileHolder {
     fsDataInputStream.seek(offset);
     return new DataInputStream(new BufferedInputStream(fsDataInputStream, 1 * 1024 * 1024));
   }
+  public Map<String, FSDataInputStream> getFileNameAndStreamCache() {
+    return fileNameAndStreamCache;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33de599a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
new file mode 100644
index 0000000..67648fe
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.impl;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.filesystem.*;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class DefaultFileTypeProvider implements FileTypeInerface {
+
+  public FileHolder getFileHolder(FileFactory.FileType fileType) {
+    switch (fileType) {
+      case LOCAL:
+        return new FileHolderImpl();
+      case HDFS:
+      case ALLUXIO:
+      case VIEWFS:
+      case S3:
+        return new DFSFileHolderImpl();
+      default:
+        return new FileHolderImpl();
+    }
+  }
+
+  public CarbonFile getCarbonFile(String path, FileFactory.FileType fileType) {
+    switch (fileType) {
+      case LOCAL:
+        return new LocalCarbonFile(FileFactory.getUpdatedFilePath(path, fileType));
+      case HDFS:
+      case S3:
+        return new HDFSCarbonFile(path);
+      case ALLUXIO:
+        return new AlluxioCarbonFile(path);
+      case VIEWFS:
+        return new ViewFSCarbonFile(path);
+      default:
+        return new LocalCarbonFile(FileFactory.getUpdatedFilePath(path, fileType));
+    }
+  }
+
+  public CarbonFile getCarbonFile(String path, FileFactory.FileType fileType, Configuration conf) {
+    switch (fileType) {
+      case LOCAL:
+        return new LocalCarbonFile(FileFactory.getUpdatedFilePath(path, fileType));
+      case HDFS:
+      case S3:
+        return new HDFSCarbonFile(path, conf);
+      case ALLUXIO:
+        return new AlluxioCarbonFile(path);
+      case VIEWFS:
+        return new ViewFSCarbonFile(path);
+      default:
+        return new LocalCarbonFile(FileFactory.getUpdatedFilePath(path, fileType));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33de599a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 5e65914..2373080 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -17,41 +17,25 @@
 
 package org.apache.carbondata.core.datastore.impl;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.lang.reflect.Method;
 import java.nio.channels.FileChannel;
-import java.util.zip.GZIPInputStream;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.FileHolder;
-import org.apache.carbondata.core.datastore.filesystem.AlluxioCarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.HDFSCarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.LocalCarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.ViewFSCarbonFile;
-import org.apache.carbondata.core.util.CarbonUtil;
 
-import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.BZip2Codec;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.fs.permission.FsPermission;
 
 public final class FileFactory {
   /**
@@ -66,6 +50,10 @@ public final class FileFactory {
     configuration.addResource(new Path("../core-default.xml"));
   }
 
+  private static FileTypeInerface fileFileTypeInerface = new DefaultFileTypeProvider();
+  public static void setFileTypeInerface(FileTypeInerface fileTypeInerface) {
+    fileFileTypeInerface = fileTypeInerface;
+  }
   private FileFactory() {
 
   }
@@ -75,17 +63,7 @@ public final class FileFactory {
   }
 
   public static FileHolder getFileHolder(FileType fileType) {
-    switch (fileType) {
-      case LOCAL:
-        return new FileHolderImpl();
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-      case S3:
-        return new DFSFileHolderImpl();
-      default:
-        return new FileHolderImpl();
-    }
+    return fileFileTypeInerface.getFileHolder(fileType);
   }
 
   public static FileType getFileType(String path) {
@@ -105,40 +83,14 @@ public final class FileFactory {
   }
 
   public static CarbonFile getCarbonFile(String path) {
-    return getCarbonFile(path, getFileType(path));
+    return fileFileTypeInerface.getCarbonFile(path, getFileType(path));
   }
-
   public static CarbonFile getCarbonFile(String path, FileType fileType) {
-    switch (fileType) {
-      case LOCAL:
-        return new LocalCarbonFile(getUpdatedFilePath(path, fileType));
-      case HDFS:
-      case S3:
-        return new HDFSCarbonFile(path);
-      case ALLUXIO:
-        return new AlluxioCarbonFile(path);
-      case VIEWFS:
-        return new ViewFSCarbonFile(path);
-      default:
-        return new LocalCarbonFile(getUpdatedFilePath(path, fileType));
-    }
+    return fileFileTypeInerface.getCarbonFile(path, fileType);
   }
-
   public static CarbonFile getCarbonFile(String path, FileType fileType,
       Configuration hadoopConf) {
-    switch (fileType) {
-      case LOCAL:
-        return new LocalCarbonFile(getUpdatedFilePath(path, fileType));
-      case HDFS:
-      case S3:
-        return new HDFSCarbonFile(path, hadoopConf);
-      case ALLUXIO:
-        return new AlluxioCarbonFile(path);
-      case VIEWFS:
-        return new ViewFSCarbonFile(path);
-      default:
-        return new LocalCarbonFile(getUpdatedFilePath(path, fileType));
-    }
+    return fileFileTypeInerface.getCarbonFile(path, fileType, hadoopConf);
   }
 
   public static DataInputStream getDataInputStream(String path, FileType fileType)
@@ -150,52 +102,9 @@ public final class FileFactory {
       throws IOException {
     return getDataInputStream(path, fileType, bufferSize, configuration);
   }
-
   public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize,
-      Configuration hadoopConf)
-      throws IOException {
-    path = path.replace("\\", "/");
-    boolean gzip = path.endsWith(".gz");
-    boolean bzip2 = path.endsWith(".bz2");
-    InputStream stream;
-    switch (fileType) {
-      case LOCAL:
-        path = getUpdatedFilePath(path, fileType);
-        if (gzip) {
-          stream = new GZIPInputStream(new FileInputStream(path));
-        } else if (bzip2) {
-          stream = new BZip2CompressorInputStream(new FileInputStream(path));
-        } else {
-          stream = new FileInputStream(path);
-        }
-        break;
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-      case S3:
-        Path pt = new Path(path);
-        FileSystem fs = pt.getFileSystem(hadoopConf);
-        if (bufferSize == -1) {
-          stream = fs.open(pt);
-        } else {
-          stream = fs.open(pt, bufferSize);
-        }
-        String codecName = null;
-        if (gzip) {
-          codecName = GzipCodec.class.getName();
-        } else if (bzip2) {
-          codecName = BZip2Codec.class.getName();
-        }
-        if (null != codecName) {
-          CompressionCodecFactory ccf = new CompressionCodecFactory(hadoopConf);
-          CompressionCodec codec = ccf.getCodecByClassName(codecName);
-          stream = codec.createInputStream(stream);
-        }
-        break;
-      default:
-        throw new UnsupportedOperationException("unsupported file system");
-    }
-    return new DataInputStream(new BufferedInputStream(stream));
+      Configuration configuration) throws IOException {
+    return getCarbonFile(path).getDataInputStream(path, fileType, bufferSize, configuration);
   }
 
   /**
@@ -210,102 +119,22 @@ public final class FileFactory {
    */
   public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize,
       long offset) throws IOException {
-    path = path.replace("\\", "/");
-    switch (fileType) {
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-      case S3:
-        Path pt = new Path(path);
-        FileSystem fs = pt.getFileSystem(configuration);
-        FSDataInputStream stream = fs.open(pt, bufferSize);
-        stream.seek(offset);
-        return new DataInputStream(new BufferedInputStream(stream));
-      default:
-        path = getUpdatedFilePath(path, fileType);
-        FileInputStream fis = new FileInputStream(path);
-        long actualSkipSize = 0;
-        long skipSize = offset;
-        while (actualSkipSize != offset) {
-          actualSkipSize += fis.skip(skipSize);
-          skipSize = skipSize - actualSkipSize;
-        }
-        return new DataInputStream(new BufferedInputStream(fis));
-    }
+    return getCarbonFile(path).getDataInputStream(path, fileType, bufferSize, offset);
   }
 
   public static DataOutputStream getDataOutputStream(String path, FileType fileType)
       throws IOException {
-    path = path.replace("\\", "/");
-    switch (fileType) {
-      case LOCAL:
-        return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path)));
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-      case S3:
-        Path pt = new Path(path);
-        FileSystem fs = pt.getFileSystem(configuration);
-        return fs.create(pt, true);
-      default:
-        return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path)));
-    }
+    return getCarbonFile(path).getDataOutputStream(path, fileType);
   }
 
   public static DataOutputStream getDataOutputStream(String path, FileType fileType, int bufferSize,
       boolean append) throws IOException {
-    path = path.replace("\\", "/");
-    switch (fileType) {
-      case LOCAL:
-        path = getUpdatedFilePath(path, fileType);
-        return new DataOutputStream(
-            new BufferedOutputStream(new FileOutputStream(path, append), bufferSize));
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-      case S3:
-        Path pt = new Path(path);
-        FileSystem fs = pt.getFileSystem(configuration);
-        FSDataOutputStream stream = null;
-        if (append) {
-          // append to a file only if file already exists else file not found
-          // exception will be thrown by hdfs
-          if (CarbonUtil.isFileExists(path)) {
-            stream = fs.append(pt, bufferSize);
-          } else {
-            stream = fs.create(pt, true, bufferSize);
-          }
-        } else {
-          stream = fs.create(pt, true, bufferSize);
-        }
-        return stream;
-      default:
-        path = getUpdatedFilePath(path, fileType);
-        return new DataOutputStream(
-            new BufferedOutputStream(new FileOutputStream(path), bufferSize));
-    }
+    return getCarbonFile(path).getDataOutputStream(path, fileType, bufferSize, append);
   }
 
   public static DataOutputStream getDataOutputStream(String path, FileType fileType, int bufferSize,
       long blockSize) throws IOException {
-    path = path.replace("\\", "/");
-    switch (fileType) {
-      case LOCAL:
-        path = getUpdatedFilePath(path, fileType);
-        return new DataOutputStream(
-            new BufferedOutputStream(new FileOutputStream(path), bufferSize));
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-      case S3:
-        Path pt = new Path(path);
-        FileSystem fs = pt.getFileSystem(configuration);
-        return fs.create(pt, true, bufferSize, fs.getDefaultReplication(pt), blockSize);
-      default:
-        path = getUpdatedFilePath(path, fileType);
-        return new DataOutputStream(
-            new BufferedOutputStream(new FileOutputStream(path), bufferSize));
-    }
+    return getCarbonFile(path).getDataOutputStream(path, fileType, bufferSize, blockSize);
   }
 
   /**
@@ -318,31 +147,7 @@ public final class FileFactory {
    */
   public static boolean isFileExist(String filePath, FileType fileType, boolean performFileCheck)
       throws IOException {
-    filePath = filePath.replace("\\", "/");
-    switch (fileType) {
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-      case S3:
-        Path path = new Path(filePath);
-        FileSystem fs = path.getFileSystem(configuration);
-        if (performFileCheck) {
-          return fs.exists(path) && fs.isFile(path);
-        } else {
-          return fs.exists(path);
-        }
-
-      case LOCAL:
-      default:
-        filePath = getUpdatedFilePath(filePath, fileType);
-        File defaultFile = new File(filePath);
-
-        if (performFileCheck) {
-          return defaultFile.exists() && defaultFile.isFile();
-        } else {
-          return defaultFile.exists();
-        }
-    }
+    return getCarbonFile(filePath).isFileExist(filePath, fileType, performFileCheck);
   }
 
   /**
@@ -353,60 +158,21 @@ public final class FileFactory {
    * @param fileType - FileType Local/HDFS
    */
   public static boolean isFileExist(String filePath, FileType fileType) throws IOException {
-    filePath = filePath.replace("\\", "/");
-    switch (fileType) {
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-      case S3:
-        Path path = new Path(filePath);
-        FileSystem fs = path.getFileSystem(configuration);
-        return fs.exists(path);
-
-      case LOCAL:
-      default:
-        filePath = getUpdatedFilePath(filePath, fileType);
-        File defaultFile = new File(filePath);
-        return defaultFile.exists();
-    }
+    return getCarbonFile(filePath).isFileExist(filePath, fileType);
   }
 
   public static boolean createNewFile(String filePath, FileType fileType) throws IOException {
-    filePath = filePath.replace("\\", "/");
-    switch (fileType) {
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-      case S3:
-        Path path = new Path(filePath);
-        FileSystem fs = path.getFileSystem(configuration);
-        return fs.createNewFile(path);
-
-      case LOCAL:
-      default:
-        filePath = getUpdatedFilePath(filePath, fileType);
-        File file = new File(filePath);
-        return file.createNewFile();
-    }
+    return createNewFile(filePath, fileType, true, null);
+  }
+  public static boolean createNewFile(
+      String filePath,
+      FileType fileType,
+      boolean doAs,
+      final FsPermission permission) throws IOException {
+    return getCarbonFile(filePath).createNewFile(filePath, fileType, doAs, permission);
   }
-
   public static boolean deleteFile(String filePath, FileType fileType) throws IOException {
-    filePath = filePath.replace("\\", "/");
-    switch (fileType) {
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-      case S3:
-        Path path = new Path(filePath);
-        FileSystem fs = path.getFileSystem(configuration);
-        return fs.delete(path, true);
-
-      case LOCAL:
-      default:
-        filePath = getUpdatedFilePath(filePath, fileType);
-        File file = new File(filePath);
-        return deleteAllFilesOfDir(file);
-    }
+    return getCarbonFile(filePath).deleteFile(filePath, fileType);
   }
 
   public static boolean deleteAllFilesOfDir(File path) {
@@ -441,21 +207,7 @@ public final class FileFactory {
   }
 
   public static boolean mkdirs(String filePath, FileType fileType) throws IOException {
-    filePath = filePath.replace("\\", "/");
-    switch (fileType) {
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-      case S3:
-        Path path = new Path(filePath);
-        FileSystem fs = path.getFileSystem(configuration);
-        return fs.mkdirs(path);
-      case LOCAL:
-      default:
-        filePath = getUpdatedFilePath(filePath, fileType);
-        File file = new File(filePath);
-        return file.mkdirs();
-    }
+    return getCarbonFile(filePath).mkdirs(filePath, fileType);
   }
 
   /**
@@ -468,21 +220,7 @@ public final class FileFactory {
    */
   public static DataOutputStream getDataOutputStreamUsingAppend(String path, FileType fileType)
       throws IOException {
-    path = path.replace("\\", "/");
-    switch (fileType) {
-      case LOCAL:
-        path = getUpdatedFilePath(path, fileType);
-        return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path, true)));
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-      case S3:
-        Path pt = new Path(path);
-        FileSystem fs = pt.getFileSystem(configuration);
-        return fs.append(pt);
-      default:
-        return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path)));
-    }
+    return getCarbonFile(path).getDataOutputStreamUsingAppend(path, fileType);
   }
 
   /**
@@ -551,25 +289,7 @@ public final class FileFactory {
    * @throws IOException
    */
   public static boolean createNewLockFile(String filePath, FileType fileType) throws IOException {
-    filePath = filePath.replace("\\", "/");
-    switch (fileType) {
-      case HDFS:
-      case ALLUXIO:
-      case VIEWFS:
-      case S3:
-        Path path = new Path(filePath);
-        FileSystem fs = path.getFileSystem(configuration);
-        if (fs.createNewFile(path)) {
-          fs.deleteOnExit(path);
-          return true;
-        }
-        return false;
-      case LOCAL:
-      default:
-        filePath = getUpdatedFilePath(filePath, fileType);
-        File file = new File(filePath);
-        return file.createNewFile();
-    }
+    return getCarbonFile(filePath).createNewLockFile(filePath, fileType);
   }
 
   public enum FileType {
@@ -672,4 +392,39 @@ public final class FileFactory {
     return path.getFileSystem(configuration);
   }
 
+
+  public static void createDirectoryAndSetPermission(String directoryPath, FsPermission permission)
+      throws IOException {
+    FileFactory.FileType fileType = FileFactory.getFileType(directoryPath);
+    switch (fileType) {
+      case HDFS:
+      case VIEWFS:
+        try {
+          Path path = new Path(directoryPath);
+          FileSystem fs = path.getFileSystem(FileFactory.configuration);
+          if (!fs.exists(path)) {
+            fs.mkdirs(path);
+            fs.setPermission(path, permission);
+          }
+        } catch (IOException e) {
+          LOGGER.error("Exception occurred : " + e.getMessage());
+          throw e;
+        }
+        return;
+      case LOCAL:
+      default:
+        directoryPath = FileFactory.getUpdatedFilePath(directoryPath, fileType);
+        File file = new File(directoryPath);
+        if (!file.mkdirs()) {
+          LOGGER.error(" Failed to create directory path " + directoryPath);
+        }
+
+    }
+  }
+
+  public static void setPermission(String directoryPath, FsPermission permission, String username,
+      String group) throws IOException {
+    getCarbonFile(directoryPath).setPermission(directoryPath, permission, username, group);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33de599a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInerface.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInerface.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInerface.java
new file mode 100644
index 0000000..4676278
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInerface.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.impl;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+
+import org.apache.hadoop.conf.Configuration;
+
+public interface FileTypeInerface {
+
+  FileHolder getFileHolder(FileFactory.FileType fileType);
+  CarbonFile getCarbonFile(String path, FileFactory.FileType fileType);
+  CarbonFile getCarbonFile(String path, FileFactory.FileType fileType, Configuration configuration);
+
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33de599a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
index f9b5ec8..4a555bf 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
@@ -310,8 +310,12 @@ public class CarbonUtilTest {
 
   @Test public void testToGetCardinalityFromLevelMetadataFileForInvalidPath()
       throws IOException, InterruptedException {
-    int[] cardinality = CarbonUtil.getCardinalityFromLevelMetadataFile("");
-    assertEquals(cardinality, null);
+    try {
+      int[] cardinality = CarbonUtil.getCardinalityFromLevelMetadataFile("");
+      assertTrue(false);
+    } catch (Exception e) {
+      assertTrue(true);
+    }
   }
 
   @Test public void testToUnescapeChar() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/33de599a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index e99a1a1..3134712 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
 import org.apache.spark.sql.CarbonDatasourceHadoopRelation
 import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias}
 import org.apache.spark.sql.CarbonSource
@@ -488,7 +489,9 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val timestampFile = basePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
     val timestampFileType = FileFactory.getFileType(timestampFile)
     if (!FileFactory.isFileExist(basePath, timestampFileType)) {
-      FileFactory.mkdirs(basePath, timestampFileType)
+      FileFactory
+        .createDirectoryAndSetPermission(basePath,
+          new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
     }
     (timestampFile, timestampFileType)
   }
@@ -516,7 +519,11 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val (timestampFile, timestampFileType) = getTimestampFileAndType()
     if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
       LOGGER.audit(s"Creating timestamp file for $timestampFile")
-      FileFactory.createNewFile(timestampFile, timestampFileType)
+      FileFactory
+        .createNewFile(timestampFile,
+          timestampFileType,
+          true,
+          new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
     }
     FileFactory.getCarbonFile(timestampFile, timestampFileType)
       .setLastModifiedTime(System.currentTimeMillis())