You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/19 15:31:46 UTC

[carbondata] 01/02: [CARBONDATA-3365] Integrate apache arrow vector filling to carbon SDK

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit c85a11f0f900180dcf36976809c3d244fb24c161
Author: kumarvishal09 <ku...@gmail.com>
AuthorDate: Wed Feb 6 18:10:43 2019 +0530

    [CARBONDATA-3365] Integrate apache arrow vector filling to carbon SDK
    
    So, By integrating carbon to support filling arrow vector, contents read by
    carbondata files can be used for analytics in any programming language. say
    arrow vector filled from carbon java SDK can be read by python, c, c++ and
    many other languages supported by arrow.
    This will also increase the scope for carbondata use-cases and carbondata
    can be used for various applications as arrow is integrated already with
    many query engines.
    
    This closes #3193
---
 .../carbondata/examples/CarbonSessionExample.scala | 180 +++++------
 .../hadoop/api/CarbonFileInputFormat.java          |  20 +-
 .../carbondata/hadoop/api/CarbonInputFormat.java   |   3 +
 store/sdk/pom.xml                                  |  50 ++++
 .../apache/carbondata/sdk/file/CarbonReader.java   |  10 +
 .../carbondata/sdk/file/CarbonReaderBuilder.java   |  49 +++
 .../carbondata/sdk/file/arrow/ArrowConverter.java  |  73 +++++
 .../sdk/file/arrow/ArrowFieldWriter.java           | 328 +++++++++++++++++++++
 .../carbondata/sdk/file/arrow/ArrowUtils.java      | 111 +++++++
 .../carbondata/sdk/file/arrow/ArrowWriter.java     | 138 +++++++++
 10 files changed, 873 insertions(+), 89 deletions(-)

diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
index b6921f2..3aa761e 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
@@ -37,7 +37,7 @@ object CarbonSessionExample {
       s"$rootPath/examples/spark2/src/main/resources/log4j.properties")
 
     CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
+      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false")
     val spark = ExampleUtils.createCarbonSession("CarbonSessionExample")
     spark.sparkContext.setLogLevel("INFO")
     exampleBody(spark)
@@ -49,92 +49,96 @@ object CarbonSessionExample {
     val rootPath = new File(this.getClass.getResource("/").getPath
                             + "../../../..").getCanonicalPath
 
-    spark.sql("DROP TABLE IF EXISTS source")
-
-    // Create table
-    spark.sql(
-      s"""
-         | CREATE TABLE source(
-         | shortField SHORT,
-         | intField INT,
-         | bigintField LONG,
-         | doubleField DOUBLE,
-         | stringField STRING,
-         | timestampField TIMESTAMP,
-         | decimalField DECIMAL(18,2),
-         | dateField DATE,
-         | charField CHAR(5),
-         | floatField FLOAT
-         | )
-         | STORED AS carbondata
-       """.stripMargin)
-
-    val path = s"$rootPath/examples/spark2/src/main/resources/data.csv"
-
-    // scalastyle:off
-    spark.sql(
-      s"""
-         | LOAD DATA LOCAL INPATH '$path'
-         | INTO TABLE source
-         | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#')
-       """.stripMargin)
-    // scalastyle:on
-
-    spark.sql(
-      s"""
-         | SELECT charField, stringField, intField
-         | FROM source
-         | WHERE stringfield = 'spark' AND decimalField > 40
-      """.stripMargin).show()
-
-    spark.sql(
-      s"""
-         | SELECT *
-         | FROM source WHERE length(stringField) = 5
-       """.stripMargin).show()
-
-    spark.sql(
-      s"""
-         | SELECT *
-         | FROM source WHERE date_format(dateField, "yyyy-MM-dd") = "2015-07-23"
-       """.stripMargin).show()
-
-    spark.sql("SELECT count(stringField) FROM source").show()
-
-    spark.sql(
-      s"""
-         | SELECT sum(intField), stringField
-         | FROM source
-         | GROUP BY stringField
-       """.stripMargin).show()
-
-    spark.sql(
-      s"""
-         | SELECT t1.*, t2.*
-         | FROM source t1, source t2
-         | WHERE t1.stringField = t2.stringField
-      """.stripMargin).show()
-
-    spark.sql(
-      s"""
-         | WITH t1 AS (
-         | SELECT * FROM source
-         | UNION ALL
-         | SELECT * FROM source
-         | )
-         | SELECT t1.*, t2.*
-         | FROM t1, source t2
-         | WHERE t1.stringField = t2.stringField
-      """.stripMargin).show()
-
-    spark.sql(
-      s"""
-         | SELECT *
-         | FROM source
-         | WHERE stringField = 'spark' and floatField > 2.8
-       """.stripMargin).show()
-
-    // Drop table
-    spark.sql("DROP TABLE IF EXISTS source")
+//    spark.sql("DROP TABLE IF EXISTS source")
+//
+//    // Create table
+//    spark.sql(
+//      s"""
+//         | CREATE TABLE source(
+//         | shortField SHORT,
+//         | intField INT,
+//         | bigintField LONG,
+//         | doubleField DOUBLE,
+//         | stringField STRING,
+//         | timestampField TIMESTAMP,
+//         | decimalField DECIMAL(18,2),
+//         | dateField DATE,
+//         | charField CHAR(5),
+//         | floatField FLOAT
+//         | )
+//         | STORED AS carbondata
+//       """.stripMargin)
+//
+//    val path = s"$rootPath/examples/spark2/src/main/resources/data.csv"
+//
+//    // scalastyle:off
+//    spark.sql(
+//      s"""
+//         | LOAD DATA LOCAL INPATH '$path'
+//         | INTO TABLE source
+//         | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#')
+//       """.stripMargin)
+//    // scalastyle:on
+//
+//    spark.sql(
+//      s"""
+//         | SELECT charField, stringField, intField
+//         | FROM source
+//         | WHERE stringfield = 'spark' AND decimalField > 40
+//      """.stripMargin).show()
+//
+//    spark.sql(
+//      s"""
+//         | SELECT *
+//         | FROM source WHERE length(stringField) = 5
+//       """.stripMargin).show()
+//
+//    spark.sql(
+//      s"""
+//         | SELECT *
+//         | FROM source WHERE date_format(dateField, "yyyy-MM-dd") = "2015-07-23"
+//       """.stripMargin).show()
+//
+//    spark.sql("SELECT count(stringField) FROM source").show()
+//
+//    spark.sql(
+//      s"""
+//         | SELECT sum(intField), stringField
+//         | FROM source
+//         | GROUP BY stringField
+//       """.stripMargin).show()
+//
+//    spark.sql(
+//      s"""
+//         | SELECT t1.*, t2.*
+//         | FROM source t1, source t2
+//         | WHERE t1.stringField = t2.stringField
+//      """.stripMargin).show()
+//
+//    spark.sql(
+//      s"""
+//         | WITH t1 AS (
+//         | SELECT * FROM source
+//         | UNION ALL
+//         | SELECT * FROM source
+//         | )
+//         | SELECT t1.*, t2.*
+//         | FROM t1, source t2
+//         | WHERE t1.stringField = t2.stringField
+//      """.stripMargin).show()
+//
+//    spark.sql(
+//      s"""
+//         | SELECT *
+//         | FROM source
+//         | WHERE stringField = 'spark' and floatField > 2.8
+//       """.stripMargin).show()
+//
+//    // Drop table
+//    spark.sql("DROP TABLE IF EXISTS source")
+//    spark.sql("create table p using parquet options('path' = '/home/root1/samplecsvfiles/parquetdata')")
+//    spark.sql("select * from p").show()
+//    spark.sql("create table c using carbon as select * from p")
+//    spark.sql("desc formatted c").show(truncate = false)
   }
 }
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index d81b02c..48774f7 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -162,7 +162,13 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
         // do block filtering and get split
         splits = getSplits(job, filter, externalTableSegments, null, partitionInfo, null);
       } else {
-        for (CarbonFile carbonFile : getAllCarbonDataFiles(carbonTable.getTablePath())) {
+        List<CarbonFile> carbonFiles = null;
+        if (null != this.fileLists) {
+          carbonFiles = getAllCarbonDataFiles(this.fileLists);
+        } else {
+          carbonFiles = getAllCarbonDataFiles(carbonTable.getTablePath());
+        }
+        for (CarbonFile carbonFile : carbonFiles) {
           // Segment id is set to null because SDK does not write carbondata files with respect
           // to segments. So no specific name is present for this load.
           CarbonInputSplit split =
@@ -208,6 +214,18 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
     return carbonFiles;
   }
 
+  private List<CarbonFile> getAllCarbonDataFiles(List fileLists) {
+    List<CarbonFile> carbonFiles = new LinkedList<>();
+    try {
+      for (int i = 0; i < fileLists.size(); i++) {
+        carbonFiles.add(FileFactory.getCarbonFile(fileLists.get(i).toString()));
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return carbonFiles;
+  }
+
   /**
    * {@inheritDoc}
    * Configurations FileInputFormat.INPUT_DIR, CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 90532fb..0bcfea6 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -806,4 +806,7 @@ m filterExpression
     getQuerySegmentToAccess(conf, carbonTable.getDatabaseName(), tableName);
   }
 
+  public void setFileLists(List fileLists) {
+    this.fileLists = fileLists;
+  }
 }
diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml
index 90348d2d..7023736 100644
--- a/store/sdk/pom.xml
+++ b/store/sdk/pom.xml
@@ -45,6 +45,56 @@
       <artifactId>httpclient</artifactId>
       <version>${httpclient.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-format</artifactId>
+      <version>0.12.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-memory</artifactId>
+      <version>0.12.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-vector</artifactId>
+      <version>0.12.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-plasma</artifactId>
+      <version>0.12.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-flight</artifactId>
+      <version>0.12.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-tools</artifactId>
+      <version>0.12.0</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+      <version>${dep.jackson.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+      <version>${dep.jackson.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>${dep.jackson.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>net.sf.py4j</groupId>
+      <artifactId>py4j</artifactId>
+      <version>0.10.8.1</version>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
index e5c0680..d7f08d4 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
@@ -28,6 +28,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.hadoop.CarbonRecordReader;
 import org.apache.carbondata.hadoop.util.CarbonVectorizedRecordReader;
+import org.apache.carbondata.sdk.file.arrow.ArrowConverter;
 
 import org.apache.hadoop.mapreduce.RecordReader;
 
@@ -94,6 +95,15 @@ public class CarbonReader<T> {
     return currentReader.getCurrentValue();
   }
 
+  public byte[] readArrowBatch(Schema carbonSchema) throws Exception {
+    ArrowConverter arrowConverter = new ArrowConverter(carbonSchema, 10000);
+    while (hasNext()) {
+      arrowConverter.addToArrowBuffer(readNextBatchRow());
+    }
+    final byte[] bytes = arrowConverter.toSerializeArray();
+    arrowConverter.close();
+    return bytes;
+  }
   /**
    * Read and return next batch row objects
    */
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index 82d2430..f1855ee 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonSessionInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
 import org.apache.carbondata.hadoop.util.CarbonVectorizedRecordReader;
 
@@ -146,6 +147,54 @@ public class CarbonReaderBuilder {
     return this;
   }
 
+  public String[] getSplits() throws IOException {
+    if (hadoopConf == null) {
+      hadoopConf = FileFactory.getConfiguration();
+    }
+    CarbonTable table;
+    // now always infer schema. TODO:Refactor in next version.
+    table = CarbonTable.buildTable(tablePath, tableName, hadoopConf, false);
+    final CarbonFileInputFormat format = new CarbonFileInputFormat();
+    final Job job = new Job(hadoopConf);
+    format.setTableInfo(job.getConfiguration(), table.getTableInfo());
+    format.setTablePath(job.getConfiguration(), table.getTablePath());
+    format.setTableName(job.getConfiguration(), table.getTableName());
+    format.setDatabaseName(job.getConfiguration(), table.getDatabaseName());
+    if (filterExpression != null) {
+      format.setFilterPredicates(job.getConfiguration(), filterExpression);
+    }
+
+    if (projectionColumns != null) {
+      // set the user projection
+      int len = projectionColumns.length;
+      //      TODO : Handle projection of complex child columns
+      for (int i = 0; i < len; i++) {
+        if (projectionColumns[i].contains(".")) {
+          throw new UnsupportedOperationException(
+              "Complex child columns projection NOT supported through CarbonReader");
+        }
+      }
+      format.setColumnProjection(job.getConfiguration(), projectionColumns);
+    }
+
+    List<String> files = new ArrayList<>();
+    try {
+
+      if (filterExpression == null) {
+        job.getConfiguration().set("filter_blocks", "false");
+      }
+      List<InputSplit> splits =
+          format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID()));
+      for (InputSplit split : splits) {
+        files.add(((CarbonInputSplit) split).getPath().toUri().getPath());
+      }
+    } catch (Exception ex) {
+      // Clear the datamap cache as it can get added in getSplits() method
+      DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier());
+      throw ex;
+    }
+    return files.toArray(new String[files.size()]);
+  }
   /**
    * Build CarbonReader
    *
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
new file mode 100644
index 0000000..f4a8ba8
--- /dev/null
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowConverter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file.arrow;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.util.TimeZone;
+
+import org.apache.carbondata.sdk.file.Schema;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowFileWriter;
+
+public class ArrowConverter {
+
+  private final BufferAllocator allocator;
+  private VectorSchemaRoot root;
+  private ArrowWriter arrowWriter;
+  private org.apache.arrow.vector.types.pojo.Schema arrowSchema;
+  private ByteArrayOutputStream out;
+  private ArrowFileWriter writer;
+
+  public ArrowConverter(Schema schema, int initalSize) {
+    this.arrowSchema = ArrowUtils.toArrowSchema(schema, TimeZone.getDefault().getID());
+    this.allocator =
+        ArrowUtils.rootAllocator.newChildAllocator("toArrowBuffer", initalSize, Long.MAX_VALUE);
+    this.root = VectorSchemaRoot.create(arrowSchema, allocator);
+    this.arrowWriter = ArrowWriter.create(root);
+    this.out = new ByteArrayOutputStream();
+    this.writer = new ArrowFileWriter(root, null, Channels.newChannel(out));
+  }
+
+  public void addToArrowBuffer(Object[] data) {
+    int i = 0;
+    while (i < data.length) {
+      arrowWriter.write((Object[]) data[i]);
+      i += 1;
+    }
+  }
+
+  public byte[] toSerializeArray() throws IOException {
+    arrowWriter.finish();
+    writer.writeBatch();
+    this.writer.close();
+    arrowWriter.reset();
+    writer.close();
+    this.root.close();
+    return out.toByteArray();
+  }
+
+  public void close() {
+    //    this.root.close();
+    //    this.arrowWriter.finish();
+    //    this.allocator.close();
+  }
+}
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowFieldWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowFieldWriter.java
new file mode 100644
index 0000000..38c878f
--- /dev/null
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowFieldWriter.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.sdk.file.arrow;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.StructVector;
+
+public abstract class ArrowFieldWriter {
+
+  private ValueVector valueVector;
+
+  protected int count;
+
+  public ArrowFieldWriter(ValueVector valueVector) {
+    this.valueVector = valueVector;
+  }
+
+  public abstract void setNull();
+
+  public abstract void setValue(Object data, int ordinal);
+
+  public void write(Object data, int ordinal) {
+    if (data == null) {
+      setNull();
+    } else {
+      setValue(data, ordinal);
+    }
+    count += 1;
+  }
+
+  public void finish() {
+    valueVector.setValueCount(count);
+  }
+
+  public void reset() {
+    valueVector.reset();
+    count = 0;
+  }
+}
+
+class BooleanWriter extends ArrowFieldWriter {
+
+  private BitVector bitVector;
+
+  public BooleanWriter(BitVector bitVector) {
+    super(bitVector);
+    this.bitVector = bitVector;
+  }
+
+  @Override public void setNull() {
+    bitVector.setNull(count);
+  }
+
+  @Override public void setValue(Object data, int ordinal) {
+    bitVector.setSafe(count, (Boolean) data ? 1 : 0);
+  }
+}
+
+class ByteWriter extends ArrowFieldWriter {
+  private TinyIntVector tinyIntVector;
+
+  public ByteWriter(TinyIntVector tinyIntVector) {
+    super(tinyIntVector);
+    this.tinyIntVector = tinyIntVector;
+  }
+
+  @Override public void setNull() {
+    this.tinyIntVector.setNull(count);
+  }
+
+  @Override public void setValue(Object data, int ordinal) {
+    this.tinyIntVector.setSafe(count, (byte) data);
+  }
+}
+
+class ShortWriter extends ArrowFieldWriter {
+  private SmallIntVector smallIntVector;
+
+  public ShortWriter(SmallIntVector smallIntVector) {
+    super(smallIntVector);
+    this.smallIntVector = smallIntVector;
+  }
+
+  @Override public void setNull() {
+    this.smallIntVector.setNull(count);
+  }
+
+  @Override public void setValue(Object data, int ordinal) {
+    this.smallIntVector.setSafe(count, (short) data);
+  }
+}
+
+class IntWriter extends ArrowFieldWriter {
+  private IntVector intVector;
+
+  public IntWriter(IntVector intVector) {
+    super(intVector);
+    this.intVector = intVector;
+  }
+
+  @Override public void setNull() {
+    this.intVector.setNull(count);
+  }
+
+  @Override public void setValue(Object data, int ordinal) {
+    this.intVector.setSafe(count, (int) data);
+  }
+}
+
+class LongWriter extends ArrowFieldWriter {
+  private BigIntVector bigIntVector;
+
+  public LongWriter(BigIntVector bigIntVector) {
+    super(bigIntVector);
+    this.bigIntVector = bigIntVector;
+  }
+
+  @Override public void setNull() {
+    this.bigIntVector.setNull(count);
+  }
+
+  @Override public void setValue(Object data, int ordinal) {
+    this.bigIntVector.setSafe(count, (long) data);
+  }
+}
+
+class FloatWriter extends ArrowFieldWriter {
+  private Float4Vector float4Vector;
+
+  public FloatWriter(Float4Vector float4Vector) {
+    super(float4Vector);
+    this.float4Vector = float4Vector;
+  }
+
+  @Override public void setNull() {
+    this.float4Vector.setNull(count);
+  }
+
+  @Override public void setValue(Object data, int ordinal) {
+    this.float4Vector.setSafe(count, (float) data);
+  }
+}
+
+class DoubleWriter extends ArrowFieldWriter {
+  private Float8Vector float8Vector;
+
+  public DoubleWriter(Float8Vector float8Vector) {
+    super(float8Vector);
+    this.float8Vector = float8Vector;
+  }
+
+  @Override public void setNull() {
+    this.float8Vector.setNull(count);
+  }
+
+  @Override public void setValue(Object data, int ordinal) {
+    this.float8Vector.setSafe(count, (double) data);
+  }
+}
+
+class StringWriter extends ArrowFieldWriter {
+  private VarCharVector varCharVector;
+
+  public StringWriter(VarCharVector varCharVector) {
+    super(varCharVector);
+    this.varCharVector = varCharVector;
+  }
+
+  @Override public void setNull() {
+    this.varCharVector.setNull(count);
+  }
+
+  @Override public void setValue(Object data, int ordinal) {
+    //TODO check if it works with JAVA String object intead of utf8 String
+    byte[] bytes = ((String) data).getBytes();
+    ByteBuffer byteBuffer = ByteBuffer.wrap(((String) data).getBytes());
+    this.varCharVector.setSafe(count, byteBuffer, byteBuffer.position(), bytes.length);
+  }
+}
+
+class BinaryWriter extends ArrowFieldWriter {
+  private VarBinaryVector varBinaryVector;
+
+  public BinaryWriter(VarBinaryVector varBinaryVector) {
+    super(varBinaryVector);
+    this.varBinaryVector = varBinaryVector;
+  }
+
+  @Override public void setNull() {
+    this.varBinaryVector.setNull(count);
+  }
+
+  @Override public void setValue(Object data, int ordinal) {
+    byte[] bytes = (byte[]) data;
+    varBinaryVector.setSafe(count, bytes, 0, bytes.length);
+  }
+}
+
+class DecimalWriter extends ArrowFieldWriter {
+  private final int precision;
+  private final int scale;
+  private DecimalVector decimalVector;
+
+  public DecimalWriter(DecimalVector decimalVector, int precision, int scale) {
+    super(decimalVector);
+    this.decimalVector = decimalVector;
+    this.precision = precision;
+    this.scale = scale;
+  }
+
+  @Override public void setNull() {
+    this.decimalVector.setNull(count);
+  }
+
+  @Override public void setValue(Object data, int ordinal) {
+    BigDecimal decimal = (BigDecimal) data;
+    decimalVector.setSafe(count, decimal);
+  }
+}
+
+class ArrayWriter extends ArrowFieldWriter {
+  private ListVector listVector;
+  private ArrowFieldWriter elementWriter;
+
+  public ArrayWriter(ListVector listVector, ArrowFieldWriter elementWriter) {
+    super(listVector);
+    this.listVector = listVector;
+    this.elementWriter = elementWriter;
+  }
+
+  @Override public void setNull() {
+
+  }
+
+  @Override public void setValue(Object data, int ordinal) {
+    Object[] array = (Object[]) data;
+    int i = 0;
+    listVector.startNewValue(count);
+    while (i < array.length) {
+      elementWriter.write(array, i);
+      i += 1;
+    }
+    listVector.endValue(count, array.length);
+  }
+
+  public void finish() {
+    super.finish();
+    elementWriter.finish();
+  }
+
+  public void reset() {
+    super.reset();
+    elementWriter.reset();
+  }
+}
+
+class StructWriter extends ArrowFieldWriter {
+  private StructVector structVector;
+  private ArrowFieldWriter[] children;
+
+  public StructWriter(StructVector structVector, ArrowFieldWriter[] children) {
+    super(structVector);
+    this.structVector = structVector;
+    this.children = children;
+  }
+
+  @Override public void setNull() {
+    int i = 0;
+    while (i < children.length) {
+      children[i].setNull();
+      children[i].count += 1;
+      i += 1;
+    }
+    structVector.setNull(count);
+  }
+
+  @Override public void setValue(Object data, int ordinal) {
+    Object[] struct = (Object[]) data;
+    int i = 0;
+    while (i < struct.length) {
+      children[i].write(struct[i], i);
+      i += 1;
+    }
+    structVector.setIndexDefined(count);
+  }
+
+  public void finish() {
+    super.finish();
+    for (int i = 0; i < children.length; i++) {
+      children[i].finish();
+    }
+  }
+
+  public void reset() {
+    super.reset();
+    for (int i = 0; i < children.length; i++) {
+      children[i].reset();
+    }
+  }
+}
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowUtils.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowUtils.java
new file mode 100644
index 0000000..1374204
--- /dev/null
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowUtils.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.sdk.file.arrow;
+
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.carbondata.core.metadata.datatype.ArrayType;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalType;
+import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.metadata.datatype.StructType;
+import org.apache.carbondata.sdk.file.Field;
+import org.apache.carbondata.sdk.file.Schema;
+
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.FieldType;
+
+//TODO check with ravi
+public class ArrowUtils {
+
+  public static RootAllocator rootAllocator = new RootAllocator(Long.MAX_VALUE);
+
+  public static ArrowType toArrowType(DataType carbonDataType, String timeZoneId) {
+    if (carbonDataType == DataTypes.STRING) {
+      return ArrowType.Utf8.INSTANCE;
+    } else if (carbonDataType == DataTypes.BYTE) {
+      return new ArrowType.Int(8, true);
+    } else if (carbonDataType == DataTypes.SHORT) {
+      return new ArrowType.Int(8 * 2, true);
+    } else if (carbonDataType == DataTypes.INT) {
+      return new ArrowType.Int(8 * 4, true);
+    } else if (carbonDataType == DataTypes.LONG) {
+      return new ArrowType.Int(8 * 8, true);
+    } else if (carbonDataType == DataTypes.FLOAT) {
+      return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
+    } else if (carbonDataType == DataTypes.DOUBLE) {
+      return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
+    } else if (carbonDataType == DataTypes.BOOLEAN) {
+      return ArrowType.Bool.INSTANCE;
+    } else if (DataTypes.isDecimal(carbonDataType)) {
+      DecimalType decimal = (DecimalType) carbonDataType;
+      return new ArrowType.Decimal(decimal.getPrecision(), decimal.getScale());
+    } else if (carbonDataType == DataTypes.TIMESTAMP) {
+      return new ArrowType.Timestamp(TimeUnit.MICROSECOND, timeZoneId);
+    } else if (carbonDataType == DataTypes.DATE) {
+      return new ArrowType.Date(DateUnit.DAY);
+    } else if (carbonDataType == DataTypes.BINARY) {
+      return ArrowType.Binary.INSTANCE;
+    } else {
+      throw new UnsupportedOperationException("Operation not supported");
+    }
+  }
+
+  public static org.apache.arrow.vector.types.pojo.Field toArrowField(String name,
+      DataType dataType, String timeZoneId) {
+    if (DataTypes.isArrayType(dataType)) {
+      FieldType fieldType = new FieldType(true, ArrowType.List.INSTANCE, null);
+      List<org.apache.arrow.vector.types.pojo.Field> structFields = new ArrayList<>();
+      structFields
+          .add(toArrowField("element", ((ArrayType) dataType).getElementType(), timeZoneId));
+      return new org.apache.arrow.vector.types.pojo.Field(name, fieldType, structFields);
+      // TODO check with RAVI
+    } else if (DataTypes.isStructType(dataType)) {
+      final StructType dataType1 = (StructType) dataType;
+      FieldType fieldType = new FieldType(true, ArrowType.Struct.INSTANCE, null);
+      List<StructField> fields = dataType1.getFields();
+      List<org.apache.arrow.vector.types.pojo.Field> structFields = new ArrayList<>();
+      for (int i = 0; i < fields.size(); i++) {
+        structFields.add(
+            toArrowField(fields.get(i).getFieldName(), fields.get(i).getDataType(), timeZoneId));
+      }
+      return new org.apache.arrow.vector.types.pojo.Field(name, fieldType, structFields);
+    } else {
+      FieldType fieldType = new FieldType(true, toArrowType(dataType, timeZoneId), null);
+      return new org.apache.arrow.vector.types.pojo.Field(name, fieldType,
+          new ArrayList<org.apache.arrow.vector.types.pojo.Field>());
+    }
+  }
+
+  public static org.apache.arrow.vector.types.pojo.Schema toArrowSchema(Schema carbonSchema,
+      String timeZoneId) {
+    final Field[] fields = carbonSchema.getFields();
+    Set<org.apache.arrow.vector.types.pojo.Field> arrowField = new LinkedHashSet<>();
+    for (int i = 0; i < fields.length; i++) {
+      arrowField.add(toArrowField(fields[i].getFieldName(), fields[i].getDataType(), timeZoneId));
+    }
+    return new org.apache.arrow.vector.types.pojo.Schema(arrowField);
+  }
+}
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowWriter.java
new file mode 100644
index 0000000..f0645dd
--- /dev/null
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/arrow/ArrowWriter.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.sdk.file.arrow;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.sdk.file.Schema;
+
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+
+public class ArrowWriter {
+
+  private VectorSchemaRoot root;
+
+  private ArrowFieldWriter[] children;
+
+  private int count;
+
+  public void write(Object[] data) {
+    int i = 0;
+    while (i < children.length) {
+      children[i].write(data[i], i);
+      i += 1;
+    }
+    count += 1;
+  }
+
+  public void finish() {
+    root.setRowCount(count);
+    for (int i = 0; i < children.length; i++) {
+      children[i].finish();
+    }
+  }
+
+  public void reset() {
+    root.setRowCount(0);
+    count = 0;
+    for (int i = 0; i < children.length; i++) {
+      children[i].reset();
+    }
+  }
+
+  private ArrowWriter(VectorSchemaRoot root, ArrowFieldWriter[] children) {
+    this.root = root;
+    this.children = children;
+  }
+
+  public static ArrowWriter create(Schema schema, String timeZoneId) {
+    org.apache.arrow.vector.types.pojo.Schema arrowSchema =
+        ArrowUtils.toArrowSchema(schema, timeZoneId);
+    VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, ArrowUtils.rootAllocator);
+    return create(root);
+  }
+
+  public static ArrowWriter create(VectorSchemaRoot root) {
+    final List<FieldVector> fieldVectors = root.getFieldVectors();
+    ArrowFieldWriter[] fieldWriters = new ArrowFieldWriter[fieldVectors.size()];
+    int i = 0;
+    for (FieldVector fieldVector : fieldVectors) {
+      fieldWriters[i] = createFieldWriter(fieldVector);
+      i++;
+    }
+    return new ArrowWriter(root, fieldWriters);
+  }
+
+  private static ArrowFieldWriter createFieldWriter(ValueVector valueVector) {
+    if (valueVector instanceof BitVector) {
+      return new BooleanWriter((BitVector) valueVector);
+    } else if (valueVector instanceof TinyIntVector) {
+      return new ByteWriter((TinyIntVector) valueVector);
+    } else if (valueVector instanceof SmallIntVector) {
+      return new ShortWriter((SmallIntVector) valueVector);
+    } else if (valueVector instanceof IntVector) {
+      return new IntWriter((IntVector) valueVector);
+    } else if (valueVector instanceof BigIntVector) {
+      return new LongWriter((BigIntVector) valueVector);
+    } else if (valueVector instanceof DecimalVector) {
+      DecimalVector decimalVector = (DecimalVector) valueVector;
+      final Field field = decimalVector.getField();
+      ArrowType.Decimal c = (ArrowType.Decimal) field.getType();
+      return new DecimalWriter((DecimalVector) valueVector, c.getPrecision(), c.getScale());
+    } else if (valueVector instanceof VarCharVector) {
+      return new StringWriter((VarCharVector) valueVector);
+    } else if (valueVector instanceof Float4Vector) {
+      return new FloatWriter((Float4Vector) valueVector);
+    } else if (valueVector instanceof Float8Vector) {
+      return new DoubleWriter((Float8Vector) valueVector);
+    } else if (valueVector instanceof ListVector) {
+      ArrowFieldWriter elementVector =
+          createFieldWriter(((ListVector) valueVector).getDataVector());
+      return new ArrayWriter((ListVector) valueVector, elementVector);
+    } else if (valueVector instanceof StructVector) {
+      StructVector s = (StructVector) valueVector;
+      List<ArrowFieldWriter> arrowFieldWriters = new ArrayList<>();
+      for (int i = 0; i < s.size(); i++) {
+        arrowFieldWriters.add(createFieldWriter(s.getChildByOrdinal(i)));
+      }
+      return new StructWriter(s,
+          arrowFieldWriters.toArray(new ArrowFieldWriter[arrowFieldWriters.size()]));
+    } else if (valueVector instanceof VarBinaryVector) {
+      return new BinaryWriter((VarBinaryVector) valueVector);
+    } else {
+      throw new UnsupportedOperationException("Invalid data type");
+    }
+  }
+}
+