You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by zh...@apache.org on 2019/05/22 01:46:22 UTC
[carbondata] branch master updated: [CARBONDATA-3363] SDK supports
read carbon data by given file lists, file or folde
This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 7541ef2 [CARBONDATA-3363] SDK supports read carbon data by given file lists, file or folde
7541ef2 is described below
commit 7541ef25f50e5ba6d70cb2338123631365777fbf
Author: xubo245 <xu...@huawei.com>
AuthorDate: Mon Jan 28 20:27:30 2019 +0800
[CARBONDATA-3363] SDK supports read carbon data by given file lists, file or folde
SDK supports read carbon data by given file lists, file or folde
This closes #3194
---
.../carbondata/examples/sdk/SDKS3ReadExample.java | 111 ++++++-----
.../hadoop/api/CarbonFileInputFormat.java | 33 +++-
.../carbondata/hadoop/api/CarbonInputFormat.java | 5 +
.../apache/carbondata/sdk/file/CarbonReader.java | 11 ++
.../carbondata/sdk/file/CarbonReaderBuilder.java | 75 ++++++-
.../apache/carbondata/sdk/file/utils/SDKUtil.java | 2 +-
.../org/apache/carbondata/sdk/file/ImageTest.java | 220 +++++++++++++++++++++
7 files changed, 404 insertions(+), 53 deletions(-)
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java
index 94e4c8d..92afd0b 100644
--- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java
@@ -17,16 +17,20 @@
package org.apache.carbondata.examples.sdk;
+import java.util.List;
+
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.expression.ColumnExpression;
import org.apache.carbondata.core.scan.expression.LiteralExpression;
import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.sdk.file.CarbonReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
+import static org.apache.carbondata.sdk.file.utils.SDKUtil.listFiles;
import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
@@ -35,60 +39,69 @@ import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
* Example for testing carbonReader on S3
*/
public class SDKS3ReadExample {
- public static void main(String[] args) throws Exception {
- Logger logger = LogServiceFactory.getLogService(SDKS3ReadExample.class.getName());
- if (args == null || args.length < 3) {
- logger.error("Usage: java CarbonS3Example: <access-key> <secret-key>"
- + "<s3-endpoint> [table-path-on-s3]");
- System.exit(0);
- }
+ public static void main(String[] args) throws Exception {
+ Logger logger = LogServiceFactory.getLogService(SDKS3ReadExample.class.getName());
+ if (args == null || args.length < 3) {
+ logger.error("Usage: java CarbonS3Example: <access-key> <secret-key>"
+ + "<s3-endpoint> [table-path-on-s3]");
+ System.exit(0);
+ }
+
+ String path = "s3a://sdk/WriterOutput/carbondata5";
+ if (args.length > 3) {
+ path = args[3];
+ }
+
+ // 1. read with file list
+ Configuration conf = new Configuration();
- String path = "s3a://sdk/WriterOutput/carbondata5";
- if (args.length > 3) {
- path=args[3];
- }
+ conf.set(ACCESS_KEY, args[0]);
+ conf.set(SECRET_KEY, args[1]);
+ conf.set(ENDPOINT, args[2]);
+ List fileLists = listFiles(path, CarbonTablePath.CARBON_DATA_EXT, conf);
- // Read data
- EqualToExpression equalToExpression = new EqualToExpression(
- new ColumnExpression("name", DataTypes.STRING),
- new LiteralExpression("robot1", DataTypes.STRING));
+ // Read data
+ EqualToExpression equalToExpression = new EqualToExpression(
+ new ColumnExpression("name", DataTypes.STRING),
+ new LiteralExpression("robot1", DataTypes.STRING));
- CarbonReader reader = CarbonReader
- .builder(path, "_temp")
- .projection(new String[]{"name", "age"})
- .filter(equalToExpression)
- .withHadoopConf(ACCESS_KEY, args[0])
- .withHadoopConf(SECRET_KEY, args[1])
- .withHadoopConf(ENDPOINT, args[2])
- .build();
+ CarbonReader reader = CarbonReader
+ .builder()
+ .projection(new String[]{"name", "age"})
+ .filter(equalToExpression)
+ .withHadoopConf(ACCESS_KEY, args[0])
+ .withHadoopConf(SECRET_KEY, args[1])
+ .withHadoopConf(ENDPOINT, args[2])
+ .withFileLists(fileLists)
+ .build();
- System.out.println("\nData:");
- int i = 0;
- while (i < 20 && reader.hasNext()) {
- Object[] row = (Object[]) reader.readNextRow();
- System.out.println(row[0] + " " + row[1]);
- i++;
- }
- System.out.println("\nFinished");
- reader.close();
+ System.out.println("\nData:");
+ int i = 0;
+ while (i < 20 && reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ System.out.println(row[0] + " " + row[1]);
+ i++;
+ }
+ System.out.println("\nFinished");
+ reader.close();
- // Read without filter
- CarbonReader reader2 = CarbonReader
- .builder(path, "_temp")
- .projection(new String[]{"name", "age"})
- .withHadoopConf(ACCESS_KEY, args[0])
- .withHadoopConf(SECRET_KEY, args[1])
- .withHadoopConf(ENDPOINT, args[2])
- .build();
+ // Read without filter
+ CarbonReader reader2 = CarbonReader
+ .builder(path, "_temp")
+ .projection(new String[]{"name", "age"})
+ .withHadoopConf(ACCESS_KEY, args[0])
+ .withHadoopConf(SECRET_KEY, args[1])
+ .withHadoopConf(ENDPOINT, args[2])
+ .build();
- System.out.println("\nData:");
- i = 0;
- while (i < 20 && reader2.hasNext()) {
- Object[] row = (Object[]) reader2.readNextRow();
- System.out.println(row[0] + " " + row[1]);
- i++;
- }
- System.out.println("\nFinished");
- reader2.close();
+ System.out.println("\nData:");
+ i = 0;
+ while (i < 20 && reader2.hasNext()) {
+ Object[] row = (Object[]) reader2.readNextRow();
+ System.out.println(row[0] + " " + row[1]);
+ i++;
}
+ System.out.println("\nFinished");
+ reader2.close();
+ }
}
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 6051d4f..e83f898 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -150,7 +150,17 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList();
for (LoadMetadataDetails load : loadMetadataDetails) {
seg = new Segment(load.getLoadName(), null, readCommittedScope);
- externalTableSegments.add(seg);
+ if (fileLists != null) {
+ for (int i = 0; i < fileLists.size(); i++) {
+ if (fileLists.get(i).toString().endsWith(seg.getSegmentNo()
+ + CarbonTablePath.CARBON_DATA_EXT)) {
+ externalTableSegments.add(seg);
+ break;
+ }
+ }
+ } else {
+ externalTableSegments.add(seg);
+ }
}
}
List<InputSplit> splits = new ArrayList<>();
@@ -162,7 +172,14 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
// do block filtering and get split
splits = getSplits(job, filter, externalTableSegments, null, partitionInfo, null);
} else {
- for (CarbonFile carbonFile : getAllCarbonDataFiles(carbonTable.getTablePath())) {
+ List<CarbonFile> carbonFiles = null;
+ if (null != this.fileLists) {
+ carbonFiles = getAllCarbonDataFiles(this.fileLists);
+ } else {
+ carbonFiles = getAllCarbonDataFiles(carbonTable.getTablePath());
+ }
+
+ for (CarbonFile carbonFile : carbonFiles) {
// Segment id is set to null because SDK does not write carbondata files with respect
// to segments. So no specific name is present for this load.
CarbonInputSplit split =
@@ -208,6 +225,18 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
return carbonFiles;
}
+ private List<CarbonFile> getAllCarbonDataFiles(List fileLists) {
+ List<CarbonFile> carbonFiles = new LinkedList<CarbonFile>();
+ try {
+ for (int i = 0; i < fileLists.size(); i++) {
+ carbonFiles.add(FileFactory.getCarbonFile(fileLists.get(i).toString()));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return carbonFiles;
+ }
+
/**
* {@inheritDoc}
* Configurations FileInputFormat.INPUT_DIR, CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index e31b5b1..9005f05 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -132,6 +132,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
protected int numStreamFiles = 0;
protected int hitedStreamFiles = 0;
protected int numBlocks = 0;
+ protected List fileLists = null;
private CarbonTable carbonTable;
@@ -156,6 +157,10 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
return numBlocks;
}
+ public void setFileLists(List fileLists) {
+ this.fileLists = fileLists;
+ }
+
/**
* Set the `tableInfo` in `configuration`
*/
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
index 6ed6624..6e43f7f 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
@@ -171,6 +171,17 @@ public class CarbonReader<T> {
}
/**
+ * Return a new {@link CarbonReaderBuilder} instance
+ *
+ * @return CarbonReaderBuilder object
+ */
+ public static CarbonReaderBuilder builder() {
+ UUID uuid = UUID.randomUUID();
+ String tableName = "UnknownTable" + uuid;
+ return new CarbonReaderBuilder(tableName);
+ }
+
+ /**
* Breaks the list of CarbonRecordReader in CarbonReader into multiple
* CarbonReader objects, each iterating through some 'carbondata' files
* and return that list of CarbonReader objects
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index ef79b05..6ead50d 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -41,6 +41,7 @@ import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
import org.apache.carbondata.hadoop.util.CarbonVectorizedRecordReader;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -63,6 +64,7 @@ public class CarbonReaderBuilder {
private boolean useVectorReader = true;
private InputSplit inputSplit;
private boolean useArrowReader;
+ private List fileLists;
/**
* Construct a CarbonReaderBuilder with table path and table name
@@ -82,6 +84,54 @@ public class CarbonReaderBuilder {
}
/**
+ * Construct a CarbonReaderBuilder with table name
+ *
+ * @param tableName table name
+ */
+ CarbonReaderBuilder(String tableName) {
+ this.tableName = tableName;
+ ThreadLocalSessionInfo.setCarbonSessionInfo(new CarbonSessionInfo());
+ }
+
+ /**
+ * set carbonData file folder
+ *
+ * @param tablePath table path
+ * @return CarbonReaderBuilder object
+ */
+ public CarbonReaderBuilder withFolder(String tablePath) {
+ this.tablePath = tablePath;
+ return this;
+ }
+
+ /**
+ * set carbondata file lists
+ *
+ * @param fileLists carbondata file lists
+ * @return CarbonReaderBuilder object
+ */
+ public CarbonReaderBuilder withFileLists(List fileLists) {
+ if (null == this.fileLists) {
+ this.fileLists = fileLists;
+ } else {
+ this.fileLists.addAll(fileLists);
+ }
+ return this;
+ }
+
+ /**
+ * set one carbondata file
+ *
+ * @param file carbondata file
+ * @return CarbonReaderBuilder object
+ */
+ public CarbonReaderBuilder withFile(String file) {
+ List fileLists = new ArrayList();
+ fileLists.add(file);
+ return withFileLists(fileLists);
+ }
+
+ /**
* Configure the projection column names of carbon reader
*
* @param projectionColumnNames projection column names
@@ -190,7 +240,27 @@ public class CarbonReaderBuilder {
((CarbonInputSplit) inputSplit).getSegment().getReadCommittedScope().getFilePath();
tableName = "UnknownTable" + UUID.randomUUID();
}
- CarbonTable table = CarbonTable.buildTable(tablePath, tableName, hadoopConf);
+ CarbonTable table;
+ // now always infer schema. TODO:Refactor in next version.
+ if (null == this.fileLists && null == tablePath) {
+ throw new IllegalArgumentException("Please set table path first.");
+ }
+ if (null != this.fileLists) {
+ if (fileLists.size() < 1) {
+ throw new IllegalArgumentException("fileLists must have one file in list as least!");
+ }
+ String commonString = String.valueOf(fileLists.get(0));
+ for (int i = 1; i < fileLists.size(); i++) {
+ commonString = commonString.substring(0, StringUtils.indexOfDifference(commonString,
+ String.valueOf(fileLists.get(i))));
+ }
+ int index = commonString.lastIndexOf("/");
+ commonString = commonString.substring(0, index);
+
+ table = CarbonTable.buildTable(commonString, tableName, hadoopConf);
+ } else {
+ table = CarbonTable.buildTable(tablePath, tableName, hadoopConf);
+ }
if (enableBlockletDistribution) {
// set cache level to blocklet level
Map<String, String> tableProperties =
@@ -206,6 +276,9 @@ public class CarbonReaderBuilder {
if (filterExpression != null) {
format.setFilterPredicates(job.getConfiguration(), filterExpression);
}
+ if (null != this.fileLists) {
+ format.setFileLists(this.fileLists);
+ }
if (projectionColumns != null) {
// set the user projection
int len = projectionColumns.length;
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/utils/SDKUtil.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/utils/SDKUtil.java
index 9fec185..6533fab 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/utils/SDKUtil.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/utils/SDKUtil.java
@@ -34,7 +34,7 @@ public class SDKUtil {
final String suf, Configuration conf) throws Exception {
final String sufImageFinal = suf;
ArrayList result = new ArrayList();
- CarbonFile[] fileList = FileFactory.getCarbonFile(sourceImageFolder).listFiles();
+ CarbonFile[] fileList = FileFactory.getCarbonFile(sourceImageFolder, conf).listFiles();
for (int i = 0; i < fileList.length; i++) {
if (fileList[i].isDirectory()) {
result.addAll(listFiles(fileList[i].getCanonicalPath(), sufImageFinal, conf));
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
index e69a981..3bfea26 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
@@ -26,11 +26,13 @@ import org.apache.carbondata.core.scan.expression.ColumnExpression;
import org.apache.carbondata.core.scan.expression.LiteralExpression;
import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.util.BinaryUtil;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
import org.junit.Assert;
import org.junit.Test;
@@ -43,6 +45,7 @@ import javax.imageio.stream.ImageInputStream;
import java.awt.color.ColorSpace;
import java.awt.image.BufferedImage;
import java.io.*;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -815,4 +818,221 @@ public class ImageTest extends TestCase {
reader.close();
}
+ @Test
+ public void testBinaryWithProjectionAndFileListsAndWithFile() throws Exception {
+ int num = 5;
+ String path = "./target/flowersFolder";
+ try {
+ FileUtils.deleteDirectory(new File(path));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ Field[] fields = new Field[5];
+ fields[0] = new Field("imageId", DataTypes.INT);
+ fields[1] = new Field("imageName", DataTypes.STRING);
+ fields[2] = new Field("imageBinary", DataTypes.BINARY);
+ fields[3] = new Field("txtName", DataTypes.STRING);
+ fields[4] = new Field("txtContent", DataTypes.STRING);
+
+ String imageFolder = "./src/test/resources/image/flowers";
+
+ byte[] originBinary = null;
+
+ // read and write image data
+ for (int j = 0; j < num; j++) {
+ CarbonWriter writer = CarbonWriter
+ .builder()
+ .outputPath(path)
+ .withCsvInput(new Schema(fields))
+ .writtenBy("SDKS3Example")
+ .withPageSizeInMb(1)
+ .build();
+ ArrayList files = listFiles(imageFolder, ".jpg");
+
+ if (null != files) {
+ for (int i = 0; i < files.size(); i++) {
+ // read image and encode to Hex
+ BufferedInputStream bis = new BufferedInputStream(new FileInputStream(files.get(i).toString()));
+ char[] hexValue = null;
+ originBinary = new byte[bis.available()];
+ while ((bis.read(originBinary)) != -1) {
+ hexValue = Hex.encodeHex(originBinary);
+ }
+
+ String txtFileName = files.get(i).toString().split(".jpg")[0] + ".txt";
+ BufferedInputStream txtBis = new BufferedInputStream(new FileInputStream(txtFileName));
+ String txtValue = null;
+ byte[] txtBinary = null;
+ txtBinary = new byte[txtBis.available()];
+ while ((txtBis.read(txtBinary)) != -1) {
+ txtValue = new String(txtBinary, "UTF-8");
+ }
+ // write data
+ System.out.println(files.get(i).toString());
+ writer.write(new String[]{String.valueOf(i), files.get(i).toString(), String.valueOf(hexValue),
+ txtFileName, txtValue});
+ bis.close();
+ }
+ }
+ writer.close();
+ }
+
+ // 1. read with file list
+ List fileLists = listFiles(path, CarbonTablePath.CARBON_DATA_EXT);
+ int fileNum = fileLists.size() / 2;
+
+ Schema schema = CarbonSchemaReader.readSchema((String) fileLists.get(0)).asOriginOrder();
+ List projectionLists = new ArrayList();
+ projectionLists.add((schema.getFields())[1].getFieldName());
+ projectionLists.add((schema.getFields())[2].getFieldName());
+
+ CarbonReader reader = ArrowCarbonReader
+ .builder()
+ .withFileLists(fileLists.subList(0, fileNum))
+ .projection(projectionLists)
+ .buildArrowReader();
+
+ System.out.println("\nData:");
+ int i = 0;
+ while (i < 20 && reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+
+ assertEquals(2, row.length);
+ byte[] outputBinary = (byte[]) row[1];
+ System.out.println(row[0] + " " + row[1] + " image size:" + outputBinary.length);
+ i++;
+ }
+ assert (i == fileNum * 3);
+ System.out.println("\nFinished: " + i);
+ reader.close();
+
+ // 2. read withFile
+ CarbonReader reader2 = CarbonReader
+ .builder()
+ .withFile(fileLists.get(0).toString())
+ .build();
+
+ System.out.println("\nData2:");
+ i = 0;
+ while (i < 20 && reader2.hasNext()) {
+ Object[] row = (Object[]) reader2.readNextRow();
+ assertEquals(5, row.length);
+
+ assert (null != row[0].toString());
+ assert (null != row[0].toString());
+ assert (null != row[0].toString());
+ byte[] outputBinary = (byte[]) row[1];
+
+ String txt = row[2].toString();
+ System.out.println(row[0] + " " + row[2] +
+ " image size:" + outputBinary.length + " txt size:" + txt.length());
+ i++;
+ }
+ System.out.println("\nFinished: " + i);
+ reader2.close();
+
+ // 3. read with folder
+ CarbonReader reader3 = CarbonReader
+ .builder(path)
+ .withFolder(path)
+ .build();
+
+ System.out.println("\nData:");
+ i = 0;
+ while (i < 20 && reader3.hasNext()) {
+ Object[] row = (Object[]) reader3.readNextRow();
+
+ byte[] outputBinary = (byte[]) row[1];
+ System.out.println(row[0] + " " + row[2] + " image size:" + outputBinary.length);
+ i++;
+ }
+ System.out.println("\nFinished: " + i);
+ reader3.close();
+
+ InputSplit[] splits = ArrowCarbonReader
+ .builder()
+ .withFileLists(fileLists.subList(0, fileNum))
+ .getSplits(true);
+ Assert.assertTrue(splits.length == fileNum);
+ for (int j = 0; j < splits.length; j++) {
+ ArrowCarbonReader.builder(splits[j]).build();
+ }
+ }
+
+ public void testGetSplitWithFileListsFromDifferentFolder() throws Exception {
+
+ String path1 = "./target/flowersFolder1";
+ String path2 = "./target/flowersFolder2";
+ writeCarbonFile(path1, 3);
+ writeCarbonFile(path2, 2);
+ List fileLists = listFiles(path1, CarbonTablePath.CARBON_DATA_EXT);
+ fileLists.addAll(listFiles(path2, CarbonTablePath.CARBON_DATA_EXT));
+
+ InputSplit[] splits = ArrowCarbonReader
+ .builder()
+ .withFileLists(fileLists)
+ .getSplits(true);
+ Assert.assertTrue(5 == splits.length);
+ for (int j = 0; j < splits.length; j++) {
+ ArrowCarbonReader.builder(splits[j]).build();
+ }
+ }
+
+ public void writeCarbonFile(String path, int num) throws Exception {
+ try {
+ FileUtils.deleteDirectory(new File(path));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ Field[] fields = new Field[5];
+ fields[0] = new Field("imageId", DataTypes.INT);
+ fields[1] = new Field("imageName", DataTypes.STRING);
+ fields[2] = new Field("imageBinary", DataTypes.BINARY);
+ fields[3] = new Field("txtName", DataTypes.STRING);
+ fields[4] = new Field("txtContent", DataTypes.STRING);
+
+ String imageFolder = "./src/test/resources/image/flowers";
+
+ byte[] originBinary = null;
+
+ // read and write image data
+ for (int j = 0; j < num; j++) {
+ CarbonWriter writer = CarbonWriter
+ .builder()
+ .outputPath(path)
+ .withCsvInput(new Schema(fields))
+ .writtenBy("SDKS3Example")
+ .withPageSizeInMb(1)
+ .build();
+ ArrayList files = listFiles(imageFolder, ".jpg");
+
+ if (null != files) {
+ for (int i = 0; i < files.size(); i++) {
+ // read image and encode to Hex
+ BufferedInputStream bis = new BufferedInputStream(new FileInputStream(files.get(i).toString()));
+ char[] hexValue = null;
+ originBinary = new byte[bis.available()];
+ while ((bis.read(originBinary)) != -1) {
+ hexValue = Hex.encodeHex(originBinary);
+ }
+
+ String txtFileName = files.get(i).toString().split(".jpg")[0] + ".txt";
+ BufferedInputStream txtBis = new BufferedInputStream(new FileInputStream(txtFileName));
+ String txtValue = null;
+ byte[] txtBinary = null;
+ txtBinary = new byte[txtBis.available()];
+ while ((txtBis.read(txtBinary)) != -1) {
+ txtValue = new String(txtBinary, "UTF-8");
+ }
+ // write data
+ System.out.println(files.get(i).toString());
+ writer.write(new String[]{String.valueOf(i), files.get(i).toString(), String.valueOf(hexValue),
+ txtFileName, txtValue});
+ bis.close();
+ }
+ }
+ writer.close();
+ }
+ }
+
}