You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/20 15:44:35 UTC
[02/10] carbondata git commit: [CARBONDATA-2253][SDK] Support write
JSON/Avro data to carbon files
[CARBONDATA-2253][SDK] Support write JSON/Avro data to carbon files
This PR adds AvroCarbonWriter in SDK, it can be used to write JSON or Avro data to carbon files
This closes #2061
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e39b0a14
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e39b0a14
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e39b0a14
Branch: refs/heads/carbonfile
Commit: e39b0a14a196224dbe6fdce2ff53e09b3b76b876
Parents: 04ff367
Author: Jacky Li <ja...@qq.com>
Authored: Wed Mar 14 00:06:37 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Mar 16 23:02:37 2018 +0800
----------------------------------------------------------------------
pom.xml | 6 -
store/sdk/pom.xml | 95 +------
.../carbondata/sdk/file/AvroCarbonWriter.java | 125 +++++++++
.../sdk/file/CarbonWriterBuilder.java | 29 +-
.../sdk/file/AvroCarbonWriterTest.java | 104 ++++++++
.../sdk/file/CSVCarbonWriterSuite.java | 267 -------------------
.../sdk/file/CSVCarbonWriterTest.java | 267 +++++++++++++++++++
7 files changed, 519 insertions(+), 374 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e39b0a14/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 972be1e..287e052 100644
--- a/pom.xml
+++ b/pom.xml
@@ -579,12 +579,6 @@
<id>include-all</id>
</profile>
<profile>
- <id>store-sdk</id>
- <modules>
- <module>store/sdk</module>
- </modules>
- </profile>
- <profile>
<id>sdvtest</id>
<modules>
<module>integration/spark-common-cluster-test</module>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e39b0a14/store/sdk/pom.xml
----------------------------------------------------------------------
diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml
index 9f7038a..1d1735e 100644
--- a/store/sdk/pom.xml
+++ b/store/sdk/pom.xml
@@ -25,55 +25,27 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
+ <groupId>tech.allegro.schema.json2avro</groupId>
+ <artifactId>converter</artifactId>
+ <version>0.2.5</version>
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
+ <sourceDirectory>src/main/java</sourceDirectory>
<resources>
<resource>
- <directory>src/resources</directory>
- </resource>
- <resource>
<directory>.</directory>
</resource>
</resources>
<plugins>
<plugin>
- <groupId>org.scala-tools</groupId>
- <artifactId>maven-scala-plugin</artifactId>
- <version>2.15.2</version>
- <executions>
- <execution>
- <id>compile</id>
- <goals>
- <goal>compile</goal>
- </goals>
- <phase>compile</phase>
- </execution>
- <execution>
- <id>testCompile</id>
- <goals>
- <goal>testCompile</goal>
- </goals>
- <phase>test</phase>
- </execution>
- <execution>
- <phase>process-resources</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
@@ -82,61 +54,6 @@
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.18</version>
- <!-- Note config is repeated in scalatest config -->
- <configuration>
- <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
- <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
- <systemProperties>
- <java.awt.headless>true</java.awt.headless>
- </systemProperties>
- <failIfNoTests>false</failIfNoTests>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest-maven-plugin</artifactId>
- <version>1.0</version>
- <!-- Note config is repeated in surefire config -->
- <configuration>
- <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
- <junitxml>.</junitxml>
- <filereports>CarbonTestSuite.txt</filereports>
- <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
- </argLine>
- <stderr />
- <environmentVariables>
- </environmentVariables>
- <systemProperties>
- <java.awt.headless>true</java.awt.headless>
- </systemProperties>
- </configuration>
- <executions>
- <execution>
- <id>test</id>
- <goals>
- <goal>test</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-deploy-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-install-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e39b0a14/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
new file mode 100644
index 0000000..e88164c
--- /dev/null
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+/**
+ * Writer Implementation to write Avro Record to carbondata file.
+ */
+@InterfaceAudience.Internal
+class AvroCarbonWriter extends CarbonWriter {
+
+ private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter;
+ private TaskAttemptContext context;
+ private ObjectArrayWritable writable;
+ private Schema avroSchema;
+
+ AvroCarbonWriter(CarbonLoadModel loadModel) throws IOException {
+ Configuration hadoopConf = new Configuration();
+ CarbonTableOutputFormat.setLoadModel(hadoopConf, loadModel);
+ CarbonTableOutputFormat format = new CarbonTableOutputFormat();
+ JobID jobId = new JobID(UUID.randomUUID().toString(), 0);
+ Random random = new Random();
+ TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt());
+ TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt());
+ TaskAttemptContextImpl context = new TaskAttemptContextImpl(hadoopConf, attemptID);
+ this.recordWriter = format.getRecordWriter(context);
+ this.context = context;
+ this.writable = new ObjectArrayWritable();
+ }
+
+ private String[] avroToCsv(GenericData.Record avroRecord) {
+ if (avroSchema == null) {
+ avroSchema = avroRecord.getSchema();
+ }
+ List<Schema.Field> fields = avroSchema.getFields();
+ String[] csvField = new String[fields.size()];
+ for (int i = 0; i < fields.size(); i++) {
+ csvField[i] = avroFieldToString(fields.get(i), avroRecord.get(i));
+ }
+ return csvField;
+ }
+
+ private String avroFieldToString(Schema.Field fieldType, Object fieldValue) {
+ StringBuilder out = new StringBuilder();
+ Schema.Type type = fieldType.schema().getType();
+ switch (type) {
+ case BOOLEAN:
+ case INT:
+ case LONG:
+ case DOUBLE:
+ case STRING:
+ out.append(fieldValue.toString());
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ // TODO: convert complex type
+ }
+ return out.toString();
+ }
+
+ /**
+ * Write single row data, input row is Avro Record
+ */
+ @Override
+ public void write(Object object) throws IOException {
+ GenericData.Record record = (GenericData.Record) object;
+
+ // convert Avro record to CSV String[]
+ String[] csvRecord = avroToCsv(record);
+ writable.set(csvRecord);
+ try {
+ recordWriter.write(NullWritable.get(), writable);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Flush and close the writer
+ */
+ @Override
+ public void close() throws IOException {
+ try {
+ recordWriter.close(context);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e39b0a14/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 8734341..5be60c4 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -104,7 +104,23 @@ public class CarbonWriterBuilder {
public CarbonWriter buildWriterForCSVInput() throws IOException, InvalidLoadOptionException {
Objects.requireNonNull(schema, "schema should not be null");
Objects.requireNonNull(path, "path should not be null");
+ CarbonLoadModel loadModel = createLoadModel();
+ return new CSVCarbonWriter(loadModel);
+ }
+
+ /**
+ * Build a {@link CarbonWriter}, which accepts Avro object
+ * @return
+ * @throws IOException
+ */
+ public CarbonWriter buildWriterForAvroInput() throws IOException, InvalidLoadOptionException {
+ Objects.requireNonNull(schema, "schema should not be null");
+ Objects.requireNonNull(path, "path should not be null");
+ CarbonLoadModel loadModel = createLoadModel();
+ return new AvroCarbonWriter(loadModel);
+ }
+ private CarbonLoadModel createLoadModel() throws IOException, InvalidLoadOptionException {
// build CarbonTable using schema
CarbonTable table = buildCarbonTable();
if (persistSchemaFile) {
@@ -113,18 +129,7 @@ public class CarbonWriterBuilder {
}
// build LoadModel
- CarbonLoadModel loadModel = buildLoadModel(table);
- return new CSVCarbonWriter(loadModel);
- }
-
- /**
- * Build a {@link CarbonWriter}, which accepts Avro object
- * @return
- * @throws IOException
- */
- public CarbonWriter buildWriterForAvroInput() throws IOException {
- // TODO
- throw new UnsupportedOperationException();
+ return buildLoadModel(table);
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e39b0a14/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
new file mode 100644
index 0000000..25c34e0
--- /dev/null
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.CharEncoding;
+import org.junit.Assert;
+import org.junit.Test;
+
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+import org.apache.avro.Schema;
+
+public class AvroCarbonWriterTest {
+ private String path = "./AvroCarbonWriterSuiteWriteFiles";
+
+ @Test
+ public void testWriteBasic() throws IOException {
+ FileUtils.deleteDirectory(new File(path));
+
+ // Avro schema
+ String avroSchema =
+ "{" +
+ " \"type\" : \"record\"," +
+ " \"name\" : \"Acme\"," +
+ " \"fields\" : ["
+ + "{ \"name\" : \"name\", \"type\" : \"string\" },"
+ + "{ \"name\" : \"age\", \"type\" : \"int\" }]" +
+ "}";
+
+ String json = "{\"name\":\"bob\", \"age\":10}";
+
+ // conversion to GenericData.Record
+ JsonAvroConverter converter = new JsonAvroConverter();
+ GenericData.Record record = converter.convertToGenericDataRecord(
+ json.getBytes(CharEncoding.UTF_8), new Schema.Parser().parse(avroSchema));
+
+ Field[] fields = new Field[2];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.STRING);
+
+ try {
+ CarbonWriter writer = CarbonWriter.builder()
+ .withSchema(new org.apache.carbondata.sdk.file.Schema(fields))
+ .outputPath(path)
+ .buildWriterForAvroInput();
+
+ for (int i = 0; i < 100; i++) {
+ writer.write(record);
+ }
+ writer.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+
+ File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
+ Assert.assertTrue(segmentFolder.exists());
+
+ File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+ @Override public boolean accept(File pathname) {
+ return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+ }
+ });
+ Assert.assertNotNull(dataFiles);
+ Assert.assertEquals(1, dataFiles.length);
+
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+ @Test
+ public void testWriteAllPrimitive() throws IOException {
+ // TODO
+ }
+
+ @Test
+ public void testWriteNestedRecord() throws IOException {
+ // TODO
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e39b0a14/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
deleted file mode 100644
index 0ac6f38..0000000
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.sdk.file;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test suite for {@link CSVCarbonWriter}
- */
-public class CSVCarbonWriterSuite {
-
- @Test
- public void testWriteFiles() throws IOException {
- String path = "./testWriteFiles";
- FileUtils.deleteDirectory(new File(path));
-
- Field[] fields = new Field[2];
- fields[0] = new Field("name", DataTypes.STRING);
- fields[1] = new Field("age", DataTypes.INT);
-
- writeFilesAndVerify(new Schema(fields), path);
-
- FileUtils.deleteDirectory(new File(path));
- }
-
- @Test
- public void testWriteFilesJsonSchema() throws IOException {
- String path = "./testWriteFilesJsonSchema";
- FileUtils.deleteDirectory(new File(path));
-
- String schema = new StringBuilder()
- .append("[ \n")
- .append(" {\"name\":\"string\"},\n")
- .append(" {\"age\":\"int\"},\n")
- .append(" {\"height\":\"double\"}\n")
- .append("]")
- .toString();
-
- writeFilesAndVerify(Schema.parseJson(schema), path);
-
- FileUtils.deleteDirectory(new File(path));
- }
-
- private void writeFilesAndVerify(Schema schema, String path) {
- writeFilesAndVerify(schema, path, null);
- }
-
- private void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) {
- writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1);
- }
-
- private void writeFilesAndVerify(Schema schema, String path, boolean persistSchema) {
- writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1);
- }
-
- /**
- * Invoke CarbonWriter API to write carbon files and assert the file is rewritten
- * @param rows number of rows to write
- * @param schema schema of the file
- * @param path local write path
- * @param sortColumns sort columns
- * @param persistSchema true if want to persist schema file
- * @param blockletSize blockletSize in the file, -1 for default size
- * @param blockSize blockSize in the file, -1 for default size
- */
- private void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns,
- boolean persistSchema, int blockletSize, int blockSize) {
- try {
- CarbonWriterBuilder builder = CarbonWriter.builder()
- .withSchema(schema)
- .outputPath(path);
- if (sortColumns != null) {
- builder = builder.sortBy(sortColumns);
- }
- if (persistSchema) {
- builder = builder.persistSchemaFile(true);
- }
- if (blockletSize != -1) {
- builder = builder.withBlockletSize(blockletSize);
- }
- if (blockSize != -1) {
- builder = builder.withBlockSize(blockSize);
- }
-
- CarbonWriter writer = builder.buildWriterForCSVInput();
-
- for (int i = 0; i < rows; i++) {
- writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)});
- }
- writer.close();
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
-
- File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
- Assert.assertTrue(segmentFolder.exists());
-
- File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
- @Override public boolean accept(File pathname) {
- return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
- }
- });
- Assert.assertNotNull(dataFiles);
- Assert.assertTrue(dataFiles.length > 0);
- }
-
- @Test
- public void testAllPrimitiveDataType() throws IOException {
- // TODO: write all data type and read by CarbonRecordReader to verify the content
- String path = "./testWriteFiles";
- FileUtils.deleteDirectory(new File(path));
-
- Field[] fields = new Field[9];
- fields[0] = new Field("stringField", DataTypes.STRING);
- fields[1] = new Field("intField", DataTypes.INT);
- fields[2] = new Field("shortField", DataTypes.SHORT);
- fields[3] = new Field("longField", DataTypes.LONG);
- fields[4] = new Field("doubleField", DataTypes.DOUBLE);
- fields[5] = new Field("boolField", DataTypes.BOOLEAN);
- fields[6] = new Field("dateField", DataTypes.DATE);
- fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
- fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
-
- try {
- CarbonWriterBuilder builder = CarbonWriter.builder()
- .withSchema(new Schema(fields))
- .outputPath(path);
-
- CarbonWriter writer = builder.buildWriterForCSVInput();
-
- for (int i = 0; i < 100; i++) {
- String[] row = new String[]{
- "robot" + (i % 10),
- String.valueOf(i),
- String.valueOf(i),
- String.valueOf(Long.MAX_VALUE - i),
- String.valueOf((double) i / 2),
- String.valueOf(true),
- "2019-03-02",
- "2019-02-12 03:03:34"
- };
- writer.write(row);
- }
- writer.close();
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
-
- File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
- Assert.assertTrue(segmentFolder.exists());
-
- File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
- @Override public boolean accept(File pathname) {
- return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
- }
- });
- Assert.assertNotNull(dataFiles);
- Assert.assertTrue(dataFiles.length > 0);
-
- FileUtils.deleteDirectory(new File(path));
- }
-
- @Test
- public void test2Blocklet() throws IOException {
- String path = "./testWriteFiles";
- FileUtils.deleteDirectory(new File(path));
-
- Field[] fields = new Field[2];
- fields[0] = new Field("name", DataTypes.STRING);
- fields[1] = new Field("age", DataTypes.INT);
-
- writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 100);
-
- // TODO: implement reader to verify the number of blocklet in the file
-
- FileUtils.deleteDirectory(new File(path));
- }
-
- @Test
- public void test2Block() throws IOException {
- String path = "./testWriteFiles";
- FileUtils.deleteDirectory(new File(path));
-
- Field[] fields = new Field[2];
- fields[0] = new Field("name", DataTypes.STRING);
- fields[1] = new Field("age", DataTypes.INT);
-
- writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 2);
-
- File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
- File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
- @Override public boolean accept(File pathname) {
- return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
- }
- });
- Assert.assertNotNull(dataFiles);
- Assert.assertEquals(2, dataFiles.length);
-
- FileUtils.deleteDirectory(new File(path));
- }
-
- @Test
- public void testSortColumns() throws IOException {
- String path = "./testWriteFiles";
- FileUtils.deleteDirectory(new File(path));
-
- Field[] fields = new Field[2];
- fields[0] = new Field("name", DataTypes.STRING);
- fields[1] = new Field("age", DataTypes.INT);
-
- writeFilesAndVerify(new Schema(fields), path, new String[]{"name"});
-
- // TODO: implement reader and verify the data is sorted
-
- FileUtils.deleteDirectory(new File(path));
- }
-
- @Test
- public void testPartitionOutput() {
- // TODO: test write data with partition
- }
-
- @Test
- public void testSchemaPersistence() throws IOException {
- String path = "./testWriteFiles";
- FileUtils.deleteDirectory(new File(path));
-
- Field[] fields = new Field[2];
- fields[0] = new Field("name", DataTypes.STRING);
- fields[1] = new Field("age", DataTypes.INT);
-
- writeFilesAndVerify(new Schema(fields), path, true);
-
- String schemaFile = CarbonTablePath.getSchemaFilePath(path);
- Assert.assertTrue(new File(schemaFile).exists());
-
- FileUtils.deleteDirectory(new File(path));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e39b0a14/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
new file mode 100644
index 0000000..2281fe6
--- /dev/null
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test suite for {@link CSVCarbonWriter}
+ */
+public class CSVCarbonWriterTest {
+
+ @Test
+ public void testWriteFiles() throws IOException {
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+
+ Field[] fields = new Field[2];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+
+ writeFilesAndVerify(new Schema(fields), path);
+
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+ @Test
+ public void testWriteFilesJsonSchema() throws IOException {
+ String path = "./testWriteFilesJsonSchema";
+ FileUtils.deleteDirectory(new File(path));
+
+ String schema = new StringBuilder()
+ .append("[ \n")
+ .append(" {\"name\":\"string\"},\n")
+ .append(" {\"age\":\"int\"},\n")
+ .append(" {\"height\":\"double\"}\n")
+ .append("]")
+ .toString();
+
+ writeFilesAndVerify(Schema.parseJson(schema), path);
+
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+ private void writeFilesAndVerify(Schema schema, String path) {
+ writeFilesAndVerify(schema, path, null);
+ }
+
+ private void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) {
+ writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1);
+ }
+
+ private void writeFilesAndVerify(Schema schema, String path, boolean persistSchema) {
+ writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1);
+ }
+
+ /**
+ * Invoke CarbonWriter API to write carbon files and assert the file is rewritten
+ * @param rows number of rows to write
+ * @param schema schema of the file
+ * @param path local write path
+ * @param sortColumns sort columns
+ * @param persistSchema true if want to persist schema file
+ * @param blockletSize blockletSize in the file, -1 for default size
+ * @param blockSize blockSize in the file, -1 for default size
+ */
+ private void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns,
+ boolean persistSchema, int blockletSize, int blockSize) {
+ try {
+ CarbonWriterBuilder builder = CarbonWriter.builder()
+ .withSchema(schema)
+ .outputPath(path);
+ if (sortColumns != null) {
+ builder = builder.sortBy(sortColumns);
+ }
+ if (persistSchema) {
+ builder = builder.persistSchemaFile(true);
+ }
+ if (blockletSize != -1) {
+ builder = builder.withBlockletSize(blockletSize);
+ }
+ if (blockSize != -1) {
+ builder = builder.withBlockSize(blockSize);
+ }
+
+ CarbonWriter writer = builder.buildWriterForCSVInput();
+
+ for (int i = 0; i < rows; i++) {
+ writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)});
+ }
+ writer.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+
+ File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
+ Assert.assertTrue(segmentFolder.exists());
+
+ File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+ @Override public boolean accept(File pathname) {
+ return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+ }
+ });
+ Assert.assertNotNull(dataFiles);
+ Assert.assertTrue(dataFiles.length > 0);
+ }
+
+ @Test
+ public void testAllPrimitiveDataType() throws IOException {
+ // TODO: write all data type and read by CarbonRecordReader to verify the content
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+
+ Field[] fields = new Field[9];
+ fields[0] = new Field("stringField", DataTypes.STRING);
+ fields[1] = new Field("intField", DataTypes.INT);
+ fields[2] = new Field("shortField", DataTypes.SHORT);
+ fields[3] = new Field("longField", DataTypes.LONG);
+ fields[4] = new Field("doubleField", DataTypes.DOUBLE);
+ fields[5] = new Field("boolField", DataTypes.BOOLEAN);
+ fields[6] = new Field("dateField", DataTypes.DATE);
+ fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
+ fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
+
+ try {
+ CarbonWriterBuilder builder = CarbonWriter.builder()
+ .withSchema(new Schema(fields))
+ .outputPath(path);
+
+ CarbonWriter writer = builder.buildWriterForCSVInput();
+
+ for (int i = 0; i < 100; i++) {
+ String[] row = new String[]{
+ "robot" + (i % 10),
+ String.valueOf(i),
+ String.valueOf(i),
+ String.valueOf(Long.MAX_VALUE - i),
+ String.valueOf((double) i / 2),
+ String.valueOf(true),
+ "2019-03-02",
+ "2019-02-12 03:03:34"
+ };
+ writer.write(row);
+ }
+ writer.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+
+ File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
+ Assert.assertTrue(segmentFolder.exists());
+
+ File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+ @Override public boolean accept(File pathname) {
+ return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+ }
+ });
+ Assert.assertNotNull(dataFiles);
+ Assert.assertTrue(dataFiles.length > 0);
+
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+ @Test
+ public void test2Blocklet() throws IOException {
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+
+ Field[] fields = new Field[2];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+
+ writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 100);
+
+ // TODO: implement reader to verify the number of blocklet in the file
+
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+ @Test
+ public void test2Block() throws IOException {
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+
+ Field[] fields = new Field[2];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+
+ writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 2);
+
+ File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
+ File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+ @Override public boolean accept(File pathname) {
+ return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+ }
+ });
+ Assert.assertNotNull(dataFiles);
+ Assert.assertEquals(2, dataFiles.length);
+
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+ @Test
+ public void testSortColumns() throws IOException {
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+
+ Field[] fields = new Field[2];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+
+ writeFilesAndVerify(new Schema(fields), path, new String[]{"name"});
+
+ // TODO: implement reader and verify the data is sorted
+
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+ @Test
+ public void testPartitionOutput() {
+ // TODO: test write data with partition
+ }
+
+ @Test
+ public void testSchemaPersistence() throws IOException {
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+
+ Field[] fields = new Field[2];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+
+ writeFilesAndVerify(new Schema(fields), path, true);
+
+ String schemaFile = CarbonTablePath.getSchemaFilePath(path);
+ Assert.assertTrue(new File(schemaFile).exists());
+
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+}