You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by zh...@apache.org on 2019/05/22 01:46:22 UTC

[carbondata] branch master updated: [CARBONDATA-3363] SDK supports read carbon data by given file lists, file or folde

This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 7541ef2  [CARBONDATA-3363] SDK supports read carbon data by given file lists, file or folde
7541ef2 is described below

commit 7541ef25f50e5ba6d70cb2338123631365777fbf
Author: xubo245 <xu...@huawei.com>
AuthorDate: Mon Jan 28 20:27:30 2019 +0800

    [CARBONDATA-3363] SDK supports read carbon data by given file lists, file or folde
    
    SDK supports read carbon data by given file lists, file or folde
    
    This closes #3194
---
 .../carbondata/examples/sdk/SDKS3ReadExample.java  | 111 ++++++-----
 .../hadoop/api/CarbonFileInputFormat.java          |  33 +++-
 .../carbondata/hadoop/api/CarbonInputFormat.java   |   5 +
 .../apache/carbondata/sdk/file/CarbonReader.java   |  11 ++
 .../carbondata/sdk/file/CarbonReaderBuilder.java   |  75 ++++++-
 .../apache/carbondata/sdk/file/utils/SDKUtil.java  |   2 +-
 .../org/apache/carbondata/sdk/file/ImageTest.java  | 220 +++++++++++++++++++++
 7 files changed, 404 insertions(+), 53 deletions(-)

diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java
index 94e4c8d..92afd0b 100644
--- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java
@@ -17,16 +17,20 @@
 
 package org.apache.carbondata.examples.sdk;
 
+import java.util.List;
+
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.sdk.file.CarbonReader;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
 
+import static org.apache.carbondata.sdk.file.utils.SDKUtil.listFiles;
 import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
 import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
@@ -35,60 +39,69 @@ import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
  * Example for testing carbonReader on S3
  */
 public class SDKS3ReadExample {
-    public static void main(String[] args) throws Exception {
-        Logger logger = LogServiceFactory.getLogService(SDKS3ReadExample.class.getName());
-        if (args == null || args.length < 3) {
-            logger.error("Usage: java CarbonS3Example: <access-key> <secret-key>"
-                + "<s3-endpoint> [table-path-on-s3]");
-            System.exit(0);
-        }
+  public static void main(String[] args) throws Exception {
+    Logger logger = LogServiceFactory.getLogService(SDKS3ReadExample.class.getName());
+    if (args == null || args.length < 3) {
+      logger.error("Usage: java CarbonS3Example: <access-key> <secret-key>"
+          + "<s3-endpoint> [table-path-on-s3]");
+      System.exit(0);
+    }
+
+    String path = "s3a://sdk/WriterOutput/carbondata5";
+    if (args.length > 3) {
+      path = args[3];
+    }
+
+    // 1. read with file list
+    Configuration conf = new Configuration();
 
-        String path = "s3a://sdk/WriterOutput/carbondata5";
-        if (args.length > 3) {
-            path=args[3];
-        }
+    conf.set(ACCESS_KEY, args[0]);
+    conf.set(SECRET_KEY, args[1]);
+    conf.set(ENDPOINT, args[2]);
+    List fileLists = listFiles(path, CarbonTablePath.CARBON_DATA_EXT, conf);
 
-        // Read data
-        EqualToExpression equalToExpression = new EqualToExpression(
-            new ColumnExpression("name", DataTypes.STRING),
-            new LiteralExpression("robot1", DataTypes.STRING));
+    // Read data
+    EqualToExpression equalToExpression = new EqualToExpression(
+        new ColumnExpression("name", DataTypes.STRING),
+        new LiteralExpression("robot1", DataTypes.STRING));
 
-        CarbonReader reader = CarbonReader
-            .builder(path, "_temp")
-            .projection(new String[]{"name", "age"})
-            .filter(equalToExpression)
-            .withHadoopConf(ACCESS_KEY, args[0])
-            .withHadoopConf(SECRET_KEY, args[1])
-            .withHadoopConf(ENDPOINT, args[2])
-            .build();
+    CarbonReader reader = CarbonReader
+        .builder()
+        .projection(new String[]{"name", "age"})
+        .filter(equalToExpression)
+        .withHadoopConf(ACCESS_KEY, args[0])
+        .withHadoopConf(SECRET_KEY, args[1])
+        .withHadoopConf(ENDPOINT, args[2])
+        .withFileLists(fileLists)
+        .build();
 
-        System.out.println("\nData:");
-        int i = 0;
-        while (i < 20 && reader.hasNext()) {
-            Object[] row = (Object[]) reader.readNextRow();
-            System.out.println(row[0] + " " + row[1]);
-            i++;
-        }
-        System.out.println("\nFinished");
-        reader.close();
+    System.out.println("\nData:");
+    int i = 0;
+    while (i < 20 && reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      System.out.println(row[0] + " " + row[1]);
+      i++;
+    }
+    System.out.println("\nFinished");
+    reader.close();
 
-        // Read without filter
-        CarbonReader reader2 = CarbonReader
-            .builder(path, "_temp")
-            .projection(new String[]{"name", "age"})
-            .withHadoopConf(ACCESS_KEY, args[0])
-            .withHadoopConf(SECRET_KEY, args[1])
-            .withHadoopConf(ENDPOINT, args[2])
-            .build();
+    // Read without filter
+    CarbonReader reader2 = CarbonReader
+        .builder(path, "_temp")
+        .projection(new String[]{"name", "age"})
+        .withHadoopConf(ACCESS_KEY, args[0])
+        .withHadoopConf(SECRET_KEY, args[1])
+        .withHadoopConf(ENDPOINT, args[2])
+        .build();
 
-        System.out.println("\nData:");
-        i = 0;
-        while (i < 20 && reader2.hasNext()) {
-            Object[] row = (Object[]) reader2.readNextRow();
-            System.out.println(row[0] + " " + row[1]);
-            i++;
-        }
-        System.out.println("\nFinished");
-        reader2.close();
+    System.out.println("\nData:");
+    i = 0;
+    while (i < 20 && reader2.hasNext()) {
+      Object[] row = (Object[]) reader2.readNextRow();
+      System.out.println(row[0] + " " + row[1]);
+      i++;
     }
+    System.out.println("\nFinished");
+    reader2.close();
+  }
 }
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 6051d4f..e83f898 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -150,7 +150,17 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
         LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList();
         for (LoadMetadataDetails load : loadMetadataDetails) {
           seg = new Segment(load.getLoadName(), null, readCommittedScope);
-          externalTableSegments.add(seg);
+          if (fileLists != null) {
+            for (int i = 0; i < fileLists.size(); i++) {
+              if (fileLists.get(i).toString().endsWith(seg.getSegmentNo()
+                  + CarbonTablePath.CARBON_DATA_EXT)) {
+                externalTableSegments.add(seg);
+                break;
+              }
+            }
+          } else {
+            externalTableSegments.add(seg);
+          }
         }
       }
       List<InputSplit> splits = new ArrayList<>();
@@ -162,7 +172,14 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
         // do block filtering and get split
         splits = getSplits(job, filter, externalTableSegments, null, partitionInfo, null);
       } else {
-        for (CarbonFile carbonFile : getAllCarbonDataFiles(carbonTable.getTablePath())) {
+        List<CarbonFile> carbonFiles = null;
+        if (null != this.fileLists) {
+          carbonFiles = getAllCarbonDataFiles(this.fileLists);
+        } else {
+          carbonFiles = getAllCarbonDataFiles(carbonTable.getTablePath());
+        }
+
+        for (CarbonFile carbonFile : carbonFiles) {
           // Segment id is set to null because SDK does not write carbondata files with respect
           // to segments. So no specific name is present for this load.
           CarbonInputSplit split =
@@ -208,6 +225,18 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
     return carbonFiles;
   }
 
+  private List<CarbonFile> getAllCarbonDataFiles(List fileLists) {
+    List<CarbonFile> carbonFiles = new LinkedList<CarbonFile>();
+    try {
+      for (int i = 0; i < fileLists.size(); i++) {
+        carbonFiles.add(FileFactory.getCarbonFile(fileLists.get(i).toString()));
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return carbonFiles;
+  }
+
   /**
    * {@inheritDoc}
    * Configurations FileInputFormat.INPUT_DIR, CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index e31b5b1..9005f05 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -132,6 +132,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   protected int numStreamFiles = 0;
   protected int hitedStreamFiles = 0;
   protected int numBlocks = 0;
+  protected List fileLists = null;
 
   private CarbonTable carbonTable;
 
@@ -156,6 +157,10 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     return numBlocks;
   }
 
+  public void setFileLists(List fileLists) {
+    this.fileLists = fileLists;
+  }
+
   /**
    * Set the `tableInfo` in `configuration`
    */
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
index 6ed6624..6e43f7f 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
@@ -171,6 +171,17 @@ public class CarbonReader<T> {
   }
 
   /**
+   * Return a new {@link CarbonReaderBuilder} instance
+   *
+   * @return CarbonReaderBuilder object
+   */
+  public static CarbonReaderBuilder builder() {
+    UUID uuid = UUID.randomUUID();
+    String tableName = "UnknownTable" + uuid;
+    return new CarbonReaderBuilder(tableName);
+  }
+
+  /**
    * Breaks the list of CarbonRecordReader in CarbonReader into multiple
    * CarbonReader objects, each iterating through some 'carbondata' files
    * and return that list of CarbonReader objects
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index ef79b05..6ead50d 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -41,6 +41,7 @@ import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
 import org.apache.carbondata.hadoop.util.CarbonVectorizedRecordReader;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -63,6 +64,7 @@ public class CarbonReaderBuilder {
   private boolean useVectorReader = true;
   private InputSplit inputSplit;
   private boolean useArrowReader;
+  private List fileLists;
 
   /**
    * Construct a CarbonReaderBuilder with table path and table name
@@ -82,6 +84,54 @@ public class CarbonReaderBuilder {
   }
 
   /**
+   * Construct a CarbonReaderBuilder with table name
+   *
+   * @param tableName table name
+   */
+  CarbonReaderBuilder(String tableName) {
+    this.tableName = tableName;
+    ThreadLocalSessionInfo.setCarbonSessionInfo(new CarbonSessionInfo());
+  }
+
+  /**
+   * set carbonData file folder
+   *
+   * @param tablePath table path
+   * @return CarbonReaderBuilder object
+   */
+  public CarbonReaderBuilder withFolder(String tablePath) {
+    this.tablePath = tablePath;
+    return this;
+  }
+
+  /**
+   * set carbondata file lists
+   *
+   * @param fileLists carbondata file lists
+   * @return CarbonReaderBuilder object
+   */
+  public CarbonReaderBuilder withFileLists(List fileLists) {
+    if (null == this.fileLists) {
+      this.fileLists = fileLists;
+    } else {
+      this.fileLists.addAll(fileLists);
+    }
+    return this;
+  }
+
+  /**
+   * set one carbondata file
+   *
+   * @param file carbondata file
+   * @return CarbonReaderBuilder object
+   */
+  public CarbonReaderBuilder withFile(String file) {
+    List fileLists = new ArrayList();
+    fileLists.add(file);
+    return withFileLists(fileLists);
+  }
+
+  /**
    * Configure the projection column names of carbon reader
    *
    * @param projectionColumnNames projection column names
@@ -190,7 +240,27 @@ public class CarbonReaderBuilder {
           ((CarbonInputSplit) inputSplit).getSegment().getReadCommittedScope().getFilePath();
       tableName = "UnknownTable" + UUID.randomUUID();
     }
-    CarbonTable table = CarbonTable.buildTable(tablePath, tableName, hadoopConf);
+    CarbonTable table;
+    // now always infer schema. TODO:Refactor in next version.
+    if (null == this.fileLists && null == tablePath) {
+      throw new IllegalArgumentException("Please set table path first.");
+    }
+    if (null != this.fileLists) {
+      if (fileLists.size() < 1) {
+        throw new IllegalArgumentException("fileLists must have one file in list as least!");
+      }
+      String commonString = String.valueOf(fileLists.get(0));
+      for (int i = 1; i < fileLists.size(); i++) {
+        commonString = commonString.substring(0, StringUtils.indexOfDifference(commonString,
+            String.valueOf(fileLists.get(i))));
+      }
+      int index = commonString.lastIndexOf("/");
+      commonString = commonString.substring(0, index);
+
+      table = CarbonTable.buildTable(commonString, tableName, hadoopConf);
+    } else {
+      table = CarbonTable.buildTable(tablePath, tableName, hadoopConf);
+    }
     if (enableBlockletDistribution) {
       // set cache level to blocklet level
       Map<String, String> tableProperties =
@@ -206,6 +276,9 @@ public class CarbonReaderBuilder {
     if (filterExpression != null) {
       format.setFilterPredicates(job.getConfiguration(), filterExpression);
     }
+    if (null != this.fileLists) {
+      format.setFileLists(this.fileLists);
+    }
     if (projectionColumns != null) {
       // set the user projection
       int len = projectionColumns.length;
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/utils/SDKUtil.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/utils/SDKUtil.java
index 9fec185..6533fab 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/utils/SDKUtil.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/utils/SDKUtil.java
@@ -34,7 +34,7 @@ public class SDKUtil {
                                     final String suf, Configuration conf) throws Exception {
     final String sufImageFinal = suf;
     ArrayList result = new ArrayList();
-    CarbonFile[] fileList = FileFactory.getCarbonFile(sourceImageFolder).listFiles();
+    CarbonFile[] fileList = FileFactory.getCarbonFile(sourceImageFolder, conf).listFiles();
     for (int i = 0; i < fileList.length; i++) {
       if (fileList[i].isDirectory()) {
         result.addAll(listFiles(fileList[i].getCanonicalPath(), sufImageFinal, conf));
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
index e69a981..3bfea26 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
@@ -26,11 +26,13 @@ import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.util.BinaryUtil;
 
 import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -43,6 +45,7 @@ import javax.imageio.stream.ImageInputStream;
 import java.awt.color.ColorSpace;
 import java.awt.image.BufferedImage;
 import java.io.*;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
@@ -815,4 +818,221 @@ public class ImageTest extends TestCase {
     reader.close();
   }
 
+  @Test
+  public void testBinaryWithProjectionAndFileListsAndWithFile() throws Exception {
+    int num = 5;
+    String path = "./target/flowersFolder";
+    try {
+      FileUtils.deleteDirectory(new File(path));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    Field[] fields = new Field[5];
+    fields[0] = new Field("imageId", DataTypes.INT);
+    fields[1] = new Field("imageName", DataTypes.STRING);
+    fields[2] = new Field("imageBinary", DataTypes.BINARY);
+    fields[3] = new Field("txtName", DataTypes.STRING);
+    fields[4] = new Field("txtContent", DataTypes.STRING);
+
+    String imageFolder = "./src/test/resources/image/flowers";
+
+    byte[] originBinary = null;
+
+    // read and write image data
+    for (int j = 0; j < num; j++) {
+      CarbonWriter writer = CarbonWriter
+          .builder()
+          .outputPath(path)
+          .withCsvInput(new Schema(fields))
+          .writtenBy("SDKS3Example")
+          .withPageSizeInMb(1)
+          .build();
+      ArrayList files = listFiles(imageFolder, ".jpg");
+
+      if (null != files) {
+        for (int i = 0; i < files.size(); i++) {
+          // read image and encode to Hex
+          BufferedInputStream bis = new BufferedInputStream(new FileInputStream(files.get(i).toString()));
+          char[] hexValue = null;
+          originBinary = new byte[bis.available()];
+          while ((bis.read(originBinary)) != -1) {
+            hexValue = Hex.encodeHex(originBinary);
+          }
+
+          String txtFileName = files.get(i).toString().split(".jpg")[0] + ".txt";
+          BufferedInputStream txtBis = new BufferedInputStream(new FileInputStream(txtFileName));
+          String txtValue = null;
+          byte[] txtBinary = null;
+          txtBinary = new byte[txtBis.available()];
+          while ((txtBis.read(txtBinary)) != -1) {
+            txtValue = new String(txtBinary, "UTF-8");
+          }
+          // write data
+          System.out.println(files.get(i).toString());
+          writer.write(new String[]{String.valueOf(i), files.get(i).toString(), String.valueOf(hexValue),
+              txtFileName, txtValue});
+          bis.close();
+        }
+      }
+      writer.close();
+    }
+
+    // 1. read with file list
+    List fileLists = listFiles(path, CarbonTablePath.CARBON_DATA_EXT);
+    int fileNum = fileLists.size() / 2;
+
+    Schema schema = CarbonSchemaReader.readSchema((String) fileLists.get(0)).asOriginOrder();
+    List projectionLists = new ArrayList();
+    projectionLists.add((schema.getFields())[1].getFieldName());
+    projectionLists.add((schema.getFields())[2].getFieldName());
+
+    CarbonReader reader = ArrowCarbonReader
+        .builder()
+        .withFileLists(fileLists.subList(0, fileNum))
+        .projection(projectionLists)
+        .buildArrowReader();
+
+    System.out.println("\nData:");
+    int i = 0;
+    while (i < 20 && reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+
+      assertEquals(2, row.length);
+      byte[] outputBinary = (byte[]) row[1];
+      System.out.println(row[0] + " " + row[1] + " image size:" + outputBinary.length);
+      i++;
+    }
+    assert (i == fileNum * 3);
+    System.out.println("\nFinished: " + i);
+    reader.close();
+
+    // 2. read withFile
+    CarbonReader reader2 = CarbonReader
+        .builder()
+        .withFile(fileLists.get(0).toString())
+        .build();
+
+    System.out.println("\nData2:");
+    i = 0;
+    while (i < 20 && reader2.hasNext()) {
+      Object[] row = (Object[]) reader2.readNextRow();
+      assertEquals(5, row.length);
+
+      assert (null != row[0].toString());
+      assert (null != row[0].toString());
+      assert (null != row[0].toString());
+      byte[] outputBinary = (byte[]) row[1];
+
+      String txt = row[2].toString();
+      System.out.println(row[0] + " " + row[2] +
+          " image size:" + outputBinary.length + " txt size:" + txt.length());
+      i++;
+    }
+    System.out.println("\nFinished: " + i);
+    reader2.close();
+
+    // 3. read with folder
+    CarbonReader reader3 = CarbonReader
+        .builder(path)
+        .withFolder(path)
+        .build();
+
+    System.out.println("\nData:");
+    i = 0;
+    while (i < 20 && reader3.hasNext()) {
+      Object[] row = (Object[]) reader3.readNextRow();
+
+      byte[] outputBinary = (byte[]) row[1];
+      System.out.println(row[0] + " " + row[2] + " image size:" + outputBinary.length);
+      i++;
+    }
+    System.out.println("\nFinished: " + i);
+    reader3.close();
+
+    InputSplit[] splits = ArrowCarbonReader
+        .builder()
+        .withFileLists(fileLists.subList(0, fileNum))
+        .getSplits(true);
+    Assert.assertTrue(splits.length == fileNum);
+    for (int j = 0; j < splits.length; j++) {
+      ArrowCarbonReader.builder(splits[j]).build();
+    }
+  }
+
+  public void testGetSplitWithFileListsFromDifferentFolder() throws Exception {
+
+    String path1 = "./target/flowersFolder1";
+    String path2 = "./target/flowersFolder2";
+    writeCarbonFile(path1, 3);
+    writeCarbonFile(path2, 2);
+    List fileLists = listFiles(path1, CarbonTablePath.CARBON_DATA_EXT);
+    fileLists.addAll(listFiles(path2, CarbonTablePath.CARBON_DATA_EXT));
+
+    InputSplit[] splits = ArrowCarbonReader
+        .builder()
+        .withFileLists(fileLists)
+        .getSplits(true);
+    Assert.assertTrue(5 == splits.length);
+    for (int j = 0; j < splits.length; j++) {
+      ArrowCarbonReader.builder(splits[j]).build();
+    }
+  }
+
+  public void writeCarbonFile(String path, int num) throws Exception {
+    try {
+      FileUtils.deleteDirectory(new File(path));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    Field[] fields = new Field[5];
+    fields[0] = new Field("imageId", DataTypes.INT);
+    fields[1] = new Field("imageName", DataTypes.STRING);
+    fields[2] = new Field("imageBinary", DataTypes.BINARY);
+    fields[3] = new Field("txtName", DataTypes.STRING);
+    fields[4] = new Field("txtContent", DataTypes.STRING);
+
+    String imageFolder = "./src/test/resources/image/flowers";
+
+    byte[] originBinary = null;
+
+    // read and write image data
+    for (int j = 0; j < num; j++) {
+      CarbonWriter writer = CarbonWriter
+          .builder()
+          .outputPath(path)
+          .withCsvInput(new Schema(fields))
+          .writtenBy("SDKS3Example")
+          .withPageSizeInMb(1)
+          .build();
+      ArrayList files = listFiles(imageFolder, ".jpg");
+
+      if (null != files) {
+        for (int i = 0; i < files.size(); i++) {
+          // read image and encode to Hex
+          BufferedInputStream bis = new BufferedInputStream(new FileInputStream(files.get(i).toString()));
+          char[] hexValue = null;
+          originBinary = new byte[bis.available()];
+          while ((bis.read(originBinary)) != -1) {
+            hexValue = Hex.encodeHex(originBinary);
+          }
+
+          String txtFileName = files.get(i).toString().split(".jpg")[0] + ".txt";
+          BufferedInputStream txtBis = new BufferedInputStream(new FileInputStream(txtFileName));
+          String txtValue = null;
+          byte[] txtBinary = null;
+          txtBinary = new byte[txtBis.available()];
+          while ((txtBis.read(txtBinary)) != -1) {
+            txtValue = new String(txtBinary, "UTF-8");
+          }
+          // write data
+          System.out.println(files.get(i).toString());
+          writer.write(new String[]{String.valueOf(i), files.get(i).toString(), String.valueOf(hexValue),
+              txtFileName, txtValue});
+          bis.close();
+        }
+      }
+      writer.close();
+    }
+  }
+
 }