You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/14 04:18:05 UTC

[06/13] carbondata git commit: [CARBONDATA-2032][DataLoad] directly write carbon data files to HDFS

[CARBONDATA-2032][DataLoad] directly write carbon data files to HDFS

Currently in data loading, carbondata write the final data files to local disk and then copy it to HDFS.
For saving disk IO, carbondata can skip this procedure and directly write these files to HDFS.

This closes #1825


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/41190629
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/41190629
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/41190629

Branch: refs/heads/master
Commit: 41190629e76f104447ee0320f8b2a69a40054369
Parents: 07d4da7
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Fri Mar 9 09:50:55 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Mar 14 12:11:11 2018 +0800

----------------------------------------------------------------------
 .../constants/CarbonLoadOptionConstants.java    |   5 +
 .../filesystem/AbstractDFSCarbonFile.java       |  39 +++-
 .../core/datastore/filesystem/CarbonFile.java   |  29 +++
 .../datastore/filesystem/LocalCarbonFile.java   |  26 ++-
 .../core/datastore/impl/FileFactory.java        |  41 ++++
 .../apache/carbondata/core/util/CarbonUtil.java |  27 +++
 .../hadoop/test/util/StoreCreator.java          |  24 --
 .../dataload/TestLoadDataGeneral.scala          |  20 +-
 .../store/writer/AbstractFactDataWriter.java    | 230 ++++++++++---------
 .../writer/v3/CarbonFactDataWriterImplV3.java   |  31 ++-
 .../processing/util/CarbonLoaderUtil.java       |  11 +-
 .../carbondata/processing/StoreCreator.java     |  23 --
 12 files changed, 322 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index 8ff8dc4..823f568 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -134,4 +134,9 @@ public final class CarbonLoadOptionConstants {
    * row delimiter for each sort column bounds
    */
   public static final String SORT_COLUMN_BOUNDS_ROW_DELIMITER = ";";
+
+  @CarbonProperty
+  public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS
+      = "carbon.load.directWriteHdfs.enabled";
+  public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT = "false";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index fd5dc40..8cf3efe 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -357,7 +357,8 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
     }
   }
 
-  @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType)
+  @Override
+  public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType)
       throws IOException {
     path = path.replace("\\", "/");
     Path pt = new Path(path);
@@ -365,15 +366,26 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
     return fs.create(pt, true);
   }
 
-  @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
+  @Override
+  public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
       int bufferSize, long blockSize) throws IOException {
     path = path.replace("\\", "/");
     Path pt = new Path(path);
+    short replication = pt.getFileSystem(FileFactory.getConfiguration()).getDefaultReplication(pt);
+    return getDataOutputStream(path, fileType, bufferSize, blockSize, replication);
+  }
+
+  @Override
+  public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
+      int bufferSize, long blockSize, short replication) throws IOException {
+    path = path.replace("\\", "/");
+    Path pt = new Path(path);
     FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration());
-    return fs.create(pt, true, bufferSize, fs.getDefaultReplication(pt), blockSize);
+    return fs.create(pt, true, bufferSize, replication, blockSize);
   }
 
-  @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
+  @Override
+  public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
       int bufferSize, String compressor) throws IOException {
     path = path.replace("\\", "/");
     Path pt = new Path(path);
@@ -518,7 +530,8 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
    */
   protected abstract CarbonFile[] getFiles(FileStatus[] listStatus);
 
-  @Override public String[] getLocations() throws IOException {
+  @Override
+  public String[] getLocations() throws IOException {
     BlockLocation[] blkLocations;
     if (fileStatus instanceof LocatedFileStatus) {
       blkLocations = ((LocatedFileStatus)fileStatus).getBlockLocations();
@@ -529,4 +542,20 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
 
     return blkLocations[0].getHosts();
   }
+
+  @Override
+  public boolean setReplication(String filePath, short replication) throws IOException {
+    filePath = filePath.replace("\\", "/");
+    Path path = new Path(filePath);
+    FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
+    return fs.setReplication(path, replication);
+  }
+
+  @Override
+  public short getDefaultReplication(String filePath) throws IOException {
+    filePath = filePath.replace("\\", "/");
+    Path path = new Path(filePath);
+    FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
+    return fs.getDefaultReplication(path);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
index 80c0510..a1d6672 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
@@ -106,6 +106,18 @@ public interface CarbonFile {
 
   /**
    * get data output stream
+   * @param path file path
+   * @param fileType file type
+   * @param bufferSize write buffer size
+   * @param blockSize block size
+   * @param replication replication for this file
+   * @return data output stream
+   * @throws IOException if error occurs
+   */
+  DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType, int bufferSize,
+      long blockSize, short replication) throws IOException;
+  /**
+   * get data output stream
    * @param path
    * @param fileType
    * @param bufferSize
@@ -141,4 +153,21 @@ public interface CarbonFile {
    */
   String[] getLocations() throws IOException;
 
+  /**
+   * set the replication factor for this file
+   *
+   * @param filePath file path
+   * @param replication replication
+   * @return true, if success; false, if failed
+   * @throws IOException if error occurs
+   */
+  boolean setReplication(String filePath, short replication) throws IOException;
+
+  /**
+   * get the default replication for this file
+   * @param filePath file path
+   * @return replication factor
+   * @throws IOException if error occurs
+   */
+  short getDefaultReplication(String filePath) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
index 24022ad..d4ed2b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
@@ -309,21 +309,30 @@ public class LocalCarbonFile implements CarbonFile {
     return new DataInputStream(new BufferedInputStream(fis));
   }
 
-  @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType)
+  @Override
+  public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType)
       throws IOException {
     path = path.replace("\\", "/");
     path = FileFactory.getUpdatedFilePath(path, FileFactory.FileType.LOCAL);
     return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path)));
   }
 
-  @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
+  @Override
+  public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
       int bufferSize, long blockSize) throws IOException {
+    return getDataOutputStream(path, fileType, bufferSize, blockSize, (short) 1);
+  }
+
+  @Override
+  public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
+      int bufferSize, long blockSize, short replication) throws IOException {
     path = path.replace("\\", "/");
     path = FileFactory.getUpdatedFilePath(path, fileType);
     return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(path), bufferSize));
   }
 
-  @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
+  @Override
+  public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
       int bufferSize, String compressor) throws IOException {
     path = path.replace("\\", "/");
     path = FileFactory.getUpdatedFilePath(path, fileType);
@@ -426,4 +435,15 @@ public class LocalCarbonFile implements CarbonFile {
   @Override public String[] getLocations() throws IOException {
     return new String[]{"localhost"};
   }
+
+  @Override
+  public boolean setReplication(String filePath, short replication) throws IOException {
+    // local carbon file does not need replication
+    return true;
+  }
+
+  @Override
+  public short getDefaultReplication(String filePath) throws IOException {
+    return 1;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 9bcdfae..ef84fb3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -151,6 +151,21 @@ public final class FileFactory {
   }
 
   /**
+   * get data output stream
+   * @param path file path
+   * @param fileType file type
+   * @param bufferSize write buffer size
+   * @param blockSize block size
+   * @param replication replication
+   * @return data output stream
+   * @throws IOException if error occurs
+   */
+  public static DataOutputStream getDataOutputStream(String path, FileType fileType, int bufferSize,
+      long blockSize, short replication) throws IOException {
+    return getCarbonFile(path).getDataOutputStream(path, fileType, bufferSize, blockSize,
+        replication);
+  }
+  /**
    * get data out put stream
    * @param path
    * @param fileType
@@ -457,4 +472,30 @@ public final class FileFactory {
     }
   }
 
+  /**
+   * set the file replication
+   *
+   * @param path file path
+   * @param fileType file type
+   * @param replication replication
+   * @return true, if success; false, if failed
+   * @throws IOException if error occurs
+   */
+  public static boolean setReplication(String path, FileFactory.FileType fileType,
+      short replication) throws IOException {
+    return getCarbonFile(path, fileType).setReplication(path, replication);
+  }
+
+  /**
+   * get the default replication
+   *
+   * @param path file path
+   * @param fileType file type
+   * @return replication
+   * @throws IOException if error occurs
+   */
+  public static short getDefaultReplication(String path, FileFactory.FileType fileType)
+      throws IOException {
+    return getCarbonFile(path, fileType).getDefaultReplication(path);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index bed6aaa..b961b60 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2448,6 +2448,33 @@ public final class CarbonUtil {
   }
 
   /**
+   * This method will complete the remaining hdfs replications
+   *
+   * @param fileName hdfs file name
+   * @param fileType filetype
+   * @throws CarbonDataWriterException if error occurs
+   */
+  public static void completeRemainingHdfsReplicas(String fileName, FileFactory.FileType fileType)
+    throws CarbonDataWriterException {
+    try {
+      long startTime = System.currentTimeMillis();
+      short replication = FileFactory.getDefaultReplication(fileName, fileType);
+      if (1 == replication) {
+        return;
+      }
+      boolean replicateFlag = FileFactory.setReplication(fileName, fileType, replication);
+      if (!replicateFlag) {
+        LOGGER.error("Failed to set replication for " + fileName + " with factor " + replication);
+      }
+      LOGGER.info(
+          "Total copy time (ms) to copy file " + fileName + " is " + (System.currentTimeMillis()
+              - startTime));
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while completing remaining HDFS backups", e);
+    }
+  }
+
+  /**
    * This method will read the local carbon data file and write to carbon data file in HDFS
    *
    * @param carbonStoreFilePath

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index 1fc0508..8e8916d 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -431,30 +431,6 @@ public class StoreCreator {
 
     writeLoadMetadata(loadModel.getCarbonDataLoadSchema(), loadModel.getTableName(), loadModel.getTableName(),
         new ArrayList<LoadMetadataDetails>());
-
-    String segLocation =
-        storeLocation + "/" + databaseName + "/" + tableName + "/Fact/Part0/Segment_0";
-    File file = new File(segLocation);
-    File factFile = null;
-    File[] folderList = file.listFiles();
-    File folder = null;
-    for (int i = 0; i < folderList.length; i++) {
-      if (folderList[i].isDirectory()) {
-        folder = folderList[i];
-      }
-    }
-    if (folder.isDirectory()) {
-      File[] files = folder.listFiles();
-      for (int i = 0; i < files.length; i++) {
-        if (!files[i].isDirectory() && files[i].getName().startsWith("part")) {
-          factFile = files[i];
-          break;
-        }
-      }
-      //      Files.copy(factFile.toPath(), file.toPath(), REPLACE_EXISTING);
-      factFile.renameTo(new File(segLocation + "/" + factFile.getName()));
-      CarbonUtil.deleteFoldersAndFiles(folder);
-    }
   }
 
   public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index e5075ef..43b215e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -29,7 +29,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.spark.sql.test.util.QueryTest
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.util.CarbonProperties
 
 class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
@@ -242,6 +242,24 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
     checkAnswer(sql("select * from stale"), Row("k"))
   }
 
+  test("test data loading with directly writing fact data to hdfs") {
+    val originStatus = CarbonProperties.getInstance().getProperty(
+      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
+      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT)
+    CarbonProperties.getInstance().addProperty(
+      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS, "true")
+
+    val testData = s"$resourcesPath/sample.csv"
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
+    checkAnswer(
+      sql("SELECT COUNT(*) FROM loadtest"),
+      Seq(Row(6))
+    )
+
+    CarbonProperties.getInstance().addProperty(
+      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
+      originStatus)
+  }
   override def afterEach {
     sql("DROP TABLE if exists loadtest")
     sql("drop table if exists invalidMeasures")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 4064c0d..5783fe5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -17,15 +17,12 @@
 
 package org.apache.carbondata.processing.store.writer;
 
+import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileFilter;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.nio.channels.FileChannel;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
@@ -38,7 +35,9 @@ import java.util.concurrent.TimeUnit;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
@@ -65,13 +64,18 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
       LogServiceFactory.getLogService(AbstractFactDataWriter.class.getName());
 
   /**
-   * file channel
+   * file channel to write
    */
-  protected FileChannel fileChannel;
+  protected WritableByteChannel fileChannel;
+  protected long currentOffsetInFile;
+  /**
+   * The path of CarbonData file to write in hdfs
+   */
+  private String carbonDataFileHdfsPath;
   /**
    * The temp path of carbonData file used on executor
    */
-  protected String carbonDataFileTempPath;
+  private String carbonDataFileTempPath;
 
   /**
    * The name of carbonData file (blockId)
@@ -125,7 +129,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
    */
   private long currentFileSize;
 
-  protected FileOutputStream fileOutputStream;
+  protected DataOutputStream fileOutputStream;
 
   protected List<BlockIndexInfo> blockIndexInfoList;
 
@@ -143,6 +147,10 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
    * listener to write data map
    */
   protected DataMapWriterListener listener;
+  /**
+   * Whether directly write fact data to hdfs
+   */
+  private boolean enableDirectlyWriteData2Hdfs = false;
 
   public AbstractFactDataWriter(CarbonFactDataHandlerModel model) {
     this.model = model;
@@ -163,8 +171,19 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     LOGGER.info("Total file size: " + fileSizeInBytes + " and dataBlock Size: " +
         blockSizeThreshold);
 
+    // whether to directly write fact data to HDFS
+    String directlyWriteData2Hdfs = propInstance.getProperty(
+        CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
+        CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT);
+    this.enableDirectlyWriteData2Hdfs = "TRUE".equalsIgnoreCase(directlyWriteData2Hdfs);
+    if (enableDirectlyWriteData2Hdfs) {
+      LOGGER.info("Carbondata will directly write fact data to HDFS.");
+    } else {
+      LOGGER.info("Carbondata will write temporary fact data to local disk.");
+    }
+
     this.executorService = Executors.newFixedThreadPool(1,
-        new CarbonThreadFactory("LocalToHDFSCopyPool:" + this.model.getTableName()));
+        new CarbonThreadFactory("CompleteHDFSBackendPool:" + this.model.getTableName()));
     executorServiceSubmitList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     // in case of compaction we will pass the cardinality.
     this.localCardinality = this.model.getColCardinality();
@@ -210,10 +229,12 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
       throws CarbonDataWriterException {
     if ((currentFileSize + blockletSizeToBeAdded) >= blockSizeThreshold && currentFileSize != 0) {
       // set the current file size to zero
+      String activeFile =
+          enableDirectlyWriteData2Hdfs ? carbonDataFileHdfsPath : carbonDataFileTempPath;
       LOGGER.info("Writing data to file as max file size reached for file: "
-          + carbonDataFileTempPath + " .Data block size: " + currentFileSize);
+          + activeFile + ". Data block size: " + currentFileSize);
       // write meta data to end of the existing file
-      writeBlockletInfoToFile(fileChannel, carbonDataFileTempPath);
+      writeBlockletInfoToFile();
       this.currentFileSize = 0;
       this.dataChunksOffsets = new ArrayList<>();
       this.dataChunksLength = new ArrayList<>();
@@ -229,7 +250,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
   private void notifyDataMapBlockStart() {
     if (listener != null) {
       try {
-        listener.onBlockStart(carbonDataFileName, constructFactFileFullPath());
+        listener.onBlockStart(carbonDataFileName, carbonDataFileHdfsPath);
       } catch (IOException e) {
         throw new CarbonDataWriterException("Problem while writing datamap", e);
       }
@@ -247,11 +268,6 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     blockletId = 0;
   }
 
-  private String constructFactFileFullPath() {
-    String factFilePath =
-        this.model.getCarbonDataDirectoryPath() + File.separator + this.carbonDataFileName;
-    return factFilePath;
-  }
   /**
    * Finish writing current file. It will flush stream, copy and rename temp file to final file
    * @param copyInCurrentThread set to false if want to do data copy in a new thread
@@ -259,12 +275,23 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
   protected void commitCurrentFile(boolean copyInCurrentThread) {
     notifyDataMapBlockEnd();
     CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
-    if (copyInCurrentThread) {
-      CarbonUtil.copyCarbonDataFileToCarbonStorePath(
-          carbonDataFileTempPath, model.getCarbonDataDirectoryPath(),
-          fileSizeInBytes);
+    if (enableDirectlyWriteData2Hdfs) {
+      if (copyInCurrentThread) {
+        CarbonUtil.completeRemainingHdfsReplicas(carbonDataFileHdfsPath,
+            FileFactory.FileType.HDFS);
+      } else {
+        executorServiceSubmitList.add(executorService.submit(
+            new CompleteHdfsBackendThread(carbonDataFileHdfsPath, FileFactory.FileType.HDFS)));
+      }
     } else {
-      executorServiceSubmitList.add(executorService.submit(new CopyThread(carbonDataFileTempPath)));
+      if (copyInCurrentThread) {
+        CarbonUtil.copyCarbonDataFileToCarbonStorePath(carbonDataFileTempPath,
+            model.getCarbonDataDirectoryPath(),
+            fileSizeInBytes);
+      } else {
+        executorServiceSubmitList.add(executorService.submit(
+            new CompleteHdfsBackendThread(carbonDataFileTempPath, FileFactory.FileType.LOCAL)));
+      }
     }
   }
 
@@ -274,79 +301,46 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
    * @throws CarbonDataWriterException
    */
   public void initializeWriter() throws CarbonDataWriterException {
-    // update the filename with new new sequence
-    // increment the file sequence counter
-    initFileCount();
-
-    //each time we initialize writer, we choose a local temp location randomly
-    String[] tempFileLocations = model.getStoreLocation();
-    String chosenTempLocation = tempFileLocations[new Random().nextInt(tempFileLocations.length)];
-    LOGGER.info("Randomly choose factdata temp location: " + chosenTempLocation);
-
     this.carbonDataFileName = CarbonTablePath
         .getCarbonDataFileName(fileCount, model.getCarbonDataFileAttributes().getTaskId(),
             model.getBucketId(), model.getTaskExtension(),
             "" + model.getCarbonDataFileAttributes().getFactTimeStamp());
-    this.carbonDataFileTempPath = chosenTempLocation + File.separator + carbonDataFileName;
-    this.fileCount++;
+    this.carbonDataFileHdfsPath = model.getCarbonDataDirectoryPath() + File.separator
+        + carbonDataFileName;
     try {
-      // open channel for new data file
-      fileOutputStream = new FileOutputStream(this.carbonDataFileTempPath, true);
-      this.fileChannel = fileOutputStream.getChannel();
-    } catch (FileNotFoundException fileNotFoundException) {
-      throw new CarbonDataWriterException("Problem while getting the FileChannel for Leaf File",
-          fileNotFoundException);
-    }
-    notifyDataMapBlockStart();
-  }
-
-  private int initFileCount() {
-    int fileInitialCount = 0;
-    FileFilter fileFilter = new FileFilter() {
-      @Override public boolean accept(File pathVal) {
-        if (!pathVal.isDirectory() && pathVal.getName().startsWith(model.getTableName())
-            && pathVal.getName().contains(CarbonCommonConstants.FACT_FILE_EXT)) {
-          return true;
-        }
-        return false;
-      }
-    };
-
-    List<File> dataFileList = new ArrayList<File>();
-    for (String tempLoc : model.getStoreLocation()) {
-      File[] subFiles = new File(tempLoc).listFiles(fileFilter);
-      if (null != subFiles && subFiles.length > 0) {
-        dataFileList.addAll(Arrays.asList(subFiles));
+      if (enableDirectlyWriteData2Hdfs) {
+        // the block size will be twice the block_size specified by user to make sure that
+        // one carbondata file only consists exactly one HDFS block.
+        // Here we write the first replication and will complete the remaining later.
+        fileOutputStream = FileFactory.getDataOutputStream(carbonDataFileHdfsPath,
+            FileFactory.FileType.HDFS, CarbonCommonConstants.BYTEBUFFER_SIZE, fileSizeInBytes * 2,
+            (short) 1);
+      } else {
+        //each time we initialize writer, we choose a local temp location randomly
+        String[] tempFileLocations = model.getStoreLocation();
+        String chosenTempLocation =
+            tempFileLocations[new Random().nextInt(tempFileLocations.length)];
+        LOGGER.info("Randomly choose factdata temp location: " + chosenTempLocation);
+        carbonDataFileTempPath = chosenTempLocation + File.separator + carbonDataFileName;
+        fileOutputStream = FileFactory.getDataOutputStream(carbonDataFileTempPath,
+            FileFactory.FileType.LOCAL, CarbonCommonConstants.BYTEBUFFER_SIZE, true);
       }
-    }
 
-    File[] dataFiles = new File[dataFileList.size()];
-    dataFileList.toArray(dataFiles);
-    if (dataFiles != null && dataFiles.length > 0) {
-      // since files are in different directory, we should only compare the file name
-      // and ignore the directory
-      Arrays.sort(dataFiles, new Comparator<File>() {
-        @Override public int compare(File o1, File o2) {
-          return o1.getName().compareTo(o2.getName());
-        }
-      });
-      String dataFileName = dataFiles[dataFiles.length - 1].getName();
-      try {
-        fileInitialCount = Integer
-            .parseInt(dataFileName.substring(dataFileName.lastIndexOf('_') + 1).split("\\.")[0]);
-      } catch (NumberFormatException ex) {
-        fileInitialCount = 0;
-      }
-      fileInitialCount++;
+      this.fileCount++;
+      // open channel for new data file
+      this.fileChannel = Channels.newChannel(fileOutputStream);
+      this.currentOffsetInFile = 0;
+    } catch (IOException ex) {
+      throw new CarbonDataWriterException(
+          "Problem while getting the channel for fact data file", ex);
     }
-    return fileInitialCount;
+    notifyDataMapBlockStart();
   }
 
   /**
    * This method will write metadata at the end of file file format in thrift format
    */
-  protected abstract void writeBlockletInfoToFile(
-      FileChannel channel, String filePath) throws CarbonDataWriterException;
+  protected abstract void writeBlockletInfoToFile() throws CarbonDataWriterException;
 
   /**
    * Below method will be used to fill the vlock info details
@@ -395,18 +389,27 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
             model.getSchemaUpdatedTimeStamp());
     // get the block index info thrift
     List<BlockIndex> blockIndexThrift = CarbonMetadataUtil.getBlockIndexInfo(blockIndexInfoList);
-    // randomly choose a temp location for index file
-    String[] tempLocations = model.getStoreLocation();
-    String chosenTempLocation = tempLocations[new Random().nextInt(tempLocations.length)];
-    LOGGER.info("Randomly choose index file location: " + chosenTempLocation);
+    String indexFileName;
+    if (enableDirectlyWriteData2Hdfs) {
+      String rawFileName = model.getCarbonDataDirectoryPath() + File.separator + CarbonTablePath
+          .getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
+              model.getBucketId(), model.getTaskExtension(),
+              "" + model.getCarbonDataFileAttributes().getFactTimeStamp());
+      indexFileName = FileFactory.getUpdatedFilePath(rawFileName, FileFactory.FileType.HDFS);
+    } else {
+      // randomly choose a temp location for index file
+      String[] tempLocations = model.getStoreLocation();
+      String chosenTempLocation = tempLocations[new Random().nextInt(tempLocations.length)];
+      LOGGER.info("Randomly choose index file location: " + chosenTempLocation);
+      indexFileName = chosenTempLocation + File.separator + CarbonTablePath
+          .getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
+              model.getBucketId(), model.getTaskExtension(),
+              "" + model.getCarbonDataFileAttributes().getFactTimeStamp());
+    }
 
-    String fileName = chosenTempLocation + File.separator + CarbonTablePath
-        .getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
-            model.getBucketId(), model.getTaskExtension(),
-            "" + model.getCarbonDataFileAttributes().getFactTimeStamp());
     CarbonIndexFileWriter writer = new CarbonIndexFileWriter();
     // open file
-    writer.openThriftWriter(fileName);
+    writer.openThriftWriter(indexFileName);
     // write the header first
     writer.writeThrift(indexHeader);
     // write the indexes
@@ -414,10 +417,14 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
       writer.writeThrift(blockIndex);
     }
     writer.close();
-    // copy from temp to actual store location
-    CarbonUtil.copyCarbonDataFileToCarbonStorePath(fileName,
-            model.getCarbonDataDirectoryPath(),
-            fileSizeInBytes);
+    if (enableDirectlyWriteData2Hdfs) {
+      executorServiceSubmitList.add(executorService.submit(
+          new CompleteHdfsBackendThread(indexFileName, FileFactory.FileType.HDFS)));
+    } else {
+      CarbonUtil.copyCarbonDataFileToCarbonStorePath(indexFileName,
+          model.getCarbonDataDirectoryPath(),
+          fileSizeInBytes);
+    }
   }
 
   /**
@@ -435,27 +442,29 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
         executorServiceSubmitList.get(i).get();
       }
     } catch (InterruptedException | ExecutionException | IOException e) {
-      LOGGER.error(e, "Error while finishing writer");
-      throw new CarbonDataWriterException(e.getMessage());
+      throw new CarbonDataWriterException(e);
     }
   }
 
 
 
   /**
-   * This method will copy the carbon data file from local store location to
-   * carbon store location
+   * This method will complete hdfs backend storage for this file.
+   * It may copy the carbon data file from local store location to carbon store location,
+   * it may also complete the remaining replications for the existing hdfs file.
    */
-  private final class CopyThread implements Callable<Void> {
+  private final class CompleteHdfsBackendThread implements Callable<Void> {
 
     /**
      * complete path along with file name which needs to be copied to
      * carbon store path
      */
     private String fileName;
+    private FileFactory.FileType fileType;
 
-    private CopyThread(String fileName) {
+    private CompleteHdfsBackendThread(String fileName, FileFactory.FileType fileType) {
       this.fileName = fileName;
+      this.fileType = fileType;
     }
 
     /**
@@ -464,13 +473,16 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
      * @return computed result
      * @throws Exception if unable to compute a result
      */
-    @Override public Void call() throws Exception {
-      CarbonUtil.copyCarbonDataFileToCarbonStorePath(
-          fileName,
-          model.getCarbonDataDirectoryPath(),
-          fileSizeInBytes);
+    @Override
+    public Void call() throws Exception {
+      if (FileFactory.FileType.HDFS == fileType) {
+        CarbonUtil.completeRemainingHdfsReplicas(fileName, fileType);
+      } else {
+        CarbonUtil.copyCarbonDataFileToCarbonStorePath(fileName,
+            model.getCarbonDataDirectoryPath(),
+            fileSizeInBytes);
+      }
       return null;
     }
-
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index 1c9ccc8..3e9be7e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.processing.store.writer.v3;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -80,11 +79,11 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
     blockletDataHolder = new BlockletDataHolder();
   }
 
-  @Override protected void writeBlockletInfoToFile(FileChannel channel, String filePath)
+  @Override protected void writeBlockletInfoToFile()
       throws CarbonDataWriterException {
     try {
       // get the current file position
-      long currentPosition = channel.size();
+      long currentPosition = currentOffsetInFile;
       // get thrift file footer instance
       FileFooter3 convertFileMeta = CarbonMetadataUtil
           .convertFileFooterVersion3(blockletMetadata, blockletIndex, localCardinality,
@@ -98,7 +97,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
       buffer.put(byteArray);
       buffer.putLong(currentPosition);
       buffer.flip();
-      channel.write(buffer);
+      currentOffsetInFile += fileChannel.write(buffer);
     } catch (IOException e) {
       LOGGER.error(e, "Problem while writing the carbon file");
       throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);
@@ -178,11 +177,11 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
 
     // write data to file
     try {
-      if (fileChannel.size() == 0) {
+      if (currentOffsetInFile == 0) {
         // write the header if file is empty
-        writeHeaderToFile(fileChannel);
+        writeHeaderToFile();
       }
-      writeBlockletToFile(fileChannel, dataChunkBytes);
+      writeBlockletToFile(dataChunkBytes);
       if (listener != null) {
         listener.onBlockletEnd(blockletId++);
       }
@@ -227,12 +226,12 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
   /**
    * write file header
    */
-  private void writeHeaderToFile(FileChannel channel) throws IOException {
+  private void writeHeaderToFile() throws IOException {
     byte[] fileHeader = CarbonUtil.getByteArray(
         CarbonMetadataUtil.getFileHeader(
             true, thriftColumnSchemaList, model.getSchemaUpdatedTimeStamp()));
     ByteBuffer buffer = ByteBuffer.wrap(fileHeader);
-    channel.write(buffer);
+    currentOffsetInFile += fileChannel.write(buffer);
   }
 
   /**
@@ -243,9 +242,9 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
    * <Column3 Data ChunkV3><Column3<Page1><Page2><Page3><Page4>>
    * <Column4 Data ChunkV3><Column4<Page1><Page2><Page3><Page4>>
    */
-  private void writeBlockletToFile(FileChannel channel, byte[][] dataChunkBytes)
+  private void writeBlockletToFile(byte[][] dataChunkBytes)
       throws IOException {
-    long offset = channel.size();
+    long offset = currentOffsetInFile;
     // to maintain the offset of each data chunk in blocklet
     List<Long> currentDataChunksOffset = new ArrayList<>();
     // to maintain the length of each data chunk in blocklet
@@ -265,13 +264,13 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
       currentDataChunksOffset.add(offset);
       currentDataChunksLength.add(dataChunkBytes[i].length);
       buffer = ByteBuffer.wrap(dataChunkBytes[i]);
-      channel.write(buffer);
+      currentOffsetInFile += fileChannel.write(buffer);
       offset += dataChunkBytes[i].length;
       for (EncodedTablePage encodedTablePage : encodedTablePages) {
         EncodedColumnPage dimension = encodedTablePage.getDimension(i);
         buffer = dimension.getEncodedData();
         int bufferSize = buffer.limit();
-        channel.write(buffer);
+        currentOffsetInFile += fileChannel.write(buffer);
         offset += bufferSize;
       }
     }
@@ -281,14 +280,14 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
       currentDataChunksOffset.add(offset);
       currentDataChunksLength.add(dataChunkBytes[dataChunkStartIndex].length);
       buffer = ByteBuffer.wrap(dataChunkBytes[dataChunkStartIndex]);
-      channel.write(buffer);
+      currentOffsetInFile += fileChannel.write(buffer);
       offset += dataChunkBytes[dataChunkStartIndex].length;
       dataChunkStartIndex++;
       for (EncodedTablePage encodedTablePage : encodedTablePages) {
         EncodedColumnPage measure = encodedTablePage.getMeasure(i);
         buffer = measure.getEncodedData();
         int bufferSize = buffer.limit();
-        channel.write(buffer);
+        currentOffsetInFile += fileChannel.write(buffer);
         offset += bufferSize;
       }
     }
@@ -360,7 +359,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
 
   @Override public void writeFooterToFile() throws CarbonDataWriterException {
     if (this.blockletMetadata.size() > 0) {
-      writeBlockletInfoToFile(fileChannel, carbonDataFileTempPath);
+      writeBlockletInfoToFile();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index a948538..922a7ee 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -63,6 +63,7 @@ import org.apache.carbondata.processing.merger.NodeMultiBlockRelation;
 import static org.apache.carbondata.core.enums.EscapeSequences.*;
 
 import com.google.gson.Gson;
+import org.apache.commons.lang3.StringUtils;
 
 public final class CarbonLoaderUtil {
 
@@ -957,9 +958,13 @@ public final class CarbonLoaderUtil {
             infos.add(block);
             nodeCapacity++;
             if (LOGGER.isDebugEnabled()) {
-              LOGGER.debug(
-                  "First Assignment iteration: " + ((TableBlockInfo) block).getFilePath() + '-'
-                      + ((TableBlockInfo) block).getBlockLength() + "-->" + activeExecutor);
+              try {
+                LOGGER.debug("First Assignment iteration: block("
+                    + StringUtils.join(block.getLocations(), ", ")
+                    + ")-->" + activeExecutor);
+              } catch (IOException e) {
+                LOGGER.error(e);
+              }
             }
             remainingBlocks.remove(block);
           } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41190629/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index e79f003..aae6f03 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -405,29 +405,6 @@ public class StoreCreator {
 
     writeLoadMetadata(loadModel.getCarbonDataLoadSchema(), loadModel.getTableName(), loadModel.getTableName(),
         new ArrayList<LoadMetadataDetails>());
-
-    String segLocation =
-        storeLocation + "/" + databaseName + "/" + tableName + "/Fact/Part0/Segment_0";
-    File file = new File(segLocation);
-    File factFile = null;
-    File[] folderList = file.listFiles();
-    File folder = null;
-    for (int i = 0; i < folderList.length; i++) {
-      if (folderList[i].isDirectory()) {
-        folder = folderList[i];
-      }
-    }
-    if (folder.isDirectory()) {
-      File[] files = folder.listFiles();
-      for (int i = 0; i < files.length; i++) {
-        if (!files[i].isDirectory() && files[i].getName().startsWith("part")) {
-          factFile = files[i];
-          break;
-        }
-      }
-      factFile.renameTo(new File(segLocation + "/" + factFile.getName()));
-      CarbonUtil.deleteFoldersAndFiles(folder);
-    }
   }
 
   public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName,