You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/08/06 11:04:52 UTC

[5/5] carbondata git commit: [CARBONDATA-2825][CARBONDATA-2828] CarbonStore and InternalCarbonStore API This closes #2589

[CARBONDATA-2825][CARBONDATA-2828] CarbonStore and InternalCarbonStore API
This closes #2589


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9f10122a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9f10122a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9f10122a

Branch: refs/heads/carbonstore
Commit: 9f10122af65fcd518bfe8855c4eaa1a423e81caa
Parents: a6027ae
Author: Jacky Li <ja...@qq.com>
Authored: Wed Aug 1 02:16:26 2018 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Mon Aug 6 19:03:12 2018 +0800

----------------------------------------------------------------------
 .../common/annotations/InterfaceStability.java  |   2 +-
 .../core/datastore/row/CarbonRow.java           |  30 ++-
 .../core/metadata/schema/table/TableInfo.java   |   2 +-
 .../carbondata/hadoop/CarbonInputSplit.java     |  44 +++-
 .../hadoop/api/CarbonInputFormat.java           |  28 ++-
 .../hadoop/api/CarbonTableInputFormat.java      |  11 +
 .../hadoop/readsupport/CarbonReadSupport.java   |   3 +-
 .../apache/carbondata/store/WorkerManager.scala |   4 +-
 .../org/apache/spark/sql/CarbonSession.scala    |  22 +-
 .../carbondata/zeppelin/TestCarbonResponse.java |   4 +-
 store/conf/store.conf                           |   4 +-
 store/core/pom.xml                              |   4 +-
 .../carbondata/store/api/CarbonStore.java       |  32 ---
 .../store/api/CarbonStoreFactory.java           |  93 --------
 .../apache/carbondata/store/api/DataStore.java  |  51 ----
 .../apache/carbondata/store/api/MetaStore.java  |  50 ----
 .../apache/carbondata/store/api/SqlStore.java   |  34 ---
 .../carbondata/store/api/conf/StoreConf.java    | 197 ----------------
 .../store/api/descriptor/LoadDescriptor.java    | 114 ---------
 .../store/api/descriptor/SelectDescriptor.java  | 111 ---------
 .../store/api/descriptor/TableDescriptor.java   | 174 --------------
 .../store/api/descriptor/TableIdentifier.java   |  37 ---
 .../exception/ExecutionTimeoutException.java    |  22 --
 .../store/api/exception/SchedulerException.java |  26 ---
 .../store/api/exception/StoreException.java     |  33 ---
 .../carbondata/store/devapi/DataLoader.java     |  49 ++++
 .../carbondata/store/devapi/DataScanner.java    |  51 ++++
 .../store/devapi/InternalCarbonStore.java       |  72 ++++++
 .../devapi/InternalCarbonStoreFactory.java      |  45 ++++
 .../apache/carbondata/store/devapi/Pruner.java  |  46 ++++
 .../carbondata/store/devapi/ResultBatch.java    |  47 ++++
 .../carbondata/store/devapi/ScanOption.java     |  69 ++++++
 .../carbondata/store/devapi/ScanUnit.java       |  41 ++++
 .../apache/carbondata/store/devapi/Scanner.java |  33 +++
 .../store/devapi/TransactionalOperation.java    |  35 +++
 .../carbondata/store/impl/BlockScanUnit.java    |  74 ++++++
 .../carbondata/store/impl/CarbonStoreBase.java  | 177 --------------
 .../carbondata/store/impl/DataOperation.java    |  95 ++++++++
 .../carbondata/store/impl/DataServicePool.java  |  41 ++++
 .../carbondata/store/impl/DelegatedScanner.java |  57 +++++
 .../store/impl/DistributedCarbonStore.java      | 232 -------------------
 .../carbondata/store/impl/IndexOperation.java   |  61 +++++
 .../store/impl/IndexedRecordReader.java         |   2 +-
 .../store/impl/InternalCarbonStoreImpl.java     | 110 +++++++++
 .../carbondata/store/impl/LocalCarbonStore.java |  95 +++++---
 .../carbondata/store/impl/LocalDataScanner.java |  73 ++++++
 .../carbondata/store/impl/LocalPruner.java      |  58 +++++
 .../carbondata/store/impl/MetaOperation.java    | 212 +++++++++++++++++
 .../carbondata/store/impl/MetaProcessor.java    | 170 --------------
 .../store/impl/RemoteDataScanner.java           |  87 +++++++
 .../carbondata/store/impl/RemotePruner.java     |  56 +++++
 .../store/impl/RowMajorResultBatch.java         |  49 ++++
 .../carbondata/store/impl/Schedulable.java      | 100 ++++++++
 .../carbondata/store/impl/master/Master.java    | 172 +++++++++++---
 .../store/impl/master/PruneServiceImpl.java     |  89 +++++++
 .../store/impl/master/RegistryServiceImpl.java  |   8 +-
 .../store/impl/master/Schedulable.java          |  76 ------
 .../carbondata/store/impl/master/Scheduler.java |  71 +-----
 .../store/impl/master/StoreServiceImpl.java     | 100 ++++++++
 .../store/impl/rpc/RegistryService.java         |  32 ---
 .../store/impl/rpc/ServiceFactory.java          |  43 ----
 .../carbondata/store/impl/rpc/StoreService.java |  40 ----
 .../store/impl/rpc/model/BaseResponse.java      |  69 ------
 .../store/impl/rpc/model/LoadDataRequest.java   |  60 -----
 .../store/impl/rpc/model/QueryResponse.java     |  73 ------
 .../impl/rpc/model/RegisterWorkerRequest.java   |  73 ------
 .../impl/rpc/model/RegisterWorkerResponse.java  |  54 -----
 .../carbondata/store/impl/rpc/model/Scan.java   | 108 ---------
 .../store/impl/rpc/model/ShutdownRequest.java   |  53 -----
 .../store/impl/rpc/model/ShutdownResponse.java  |  61 -----
 .../store/impl/service/DataService.java         |  53 +++++
 .../store/impl/service/PruneService.java        |  41 ++++
 .../store/impl/service/RegistryService.java     |  32 +++
 .../store/impl/service/ServiceFactory.java      |  50 ++++
 .../store/impl/service/model/BaseResponse.java  |  69 ++++++
 .../impl/service/model/LoadDataRequest.java     |  60 +++++
 .../store/impl/service/model/PruneRequest.java  |  64 +++++
 .../store/impl/service/model/PruneResponse.java |  67 ++++++
 .../service/model/RegisterWorkerRequest.java    |  73 ++++++
 .../service/model/RegisterWorkerResponse.java   |  54 +++++
 .../store/impl/service/model/ScanRequest.java   | 108 +++++++++
 .../store/impl/service/model/ScanResponse.java  |  73 ++++++
 .../impl/service/model/ShutdownRequest.java     |  53 +++++
 .../impl/service/model/ShutdownResponse.java    |  61 +++++
 .../store/impl/worker/DataServiceImpl.java      | 174 ++++++++++++++
 .../store/impl/worker/RequestHandler.java       | 166 -------------
 .../store/impl/worker/StoreServiceImpl.java     |  77 ------
 .../carbondata/store/impl/worker/Worker.java    |  26 ++-
 .../apache/carbondata/store/util/StoreUtil.java | 134 -----------
 .../store/DistributedCarbonStoreTest.java       |  59 +++--
 .../carbondata/store/LocalCarbonStoreTest.java  |  39 ++--
 .../org/apache/carbondata/store/TestUtil.java   |  10 -
 .../horizon/rest/client/HorizonClient.java      |  10 +-
 .../rest/client/impl/SimpleHorizonClient.java   |  12 +-
 .../horizon/rest/controller/Horizon.java        |   3 +-
 .../rest/controller/HorizonController.java      |  68 +++---
 .../rest/model/validate/RequestValidator.java   |  38 +--
 .../rest/model/view/CreateTableRequest.java     |   4 +-
 .../horizon/rest/model/view/FieldRequest.java   |   2 +-
 .../horizon/rest/model/view/LoadRequest.java    |   4 +-
 .../horizon/rest/model/view/SelectResponse.java |  11 +-
 .../apache/carbondata/horizon/HorizonTest.java  |  10 +-
 store/sdk/pom.xml                               |   4 +-
 .../carbondata/sdk/file/AvroCarbonWriter.java   |  50 ++--
 .../sdk/file/CarbonWriterBuilder.java           |   4 +-
 .../org/apache/carbondata/sdk/file/Field.java   |  15 +-
 .../org/apache/carbondata/sdk/file/Schema.java  |   3 +-
 .../carbondata/sdk/store/CarbonStore.java       | 151 ++++++++++++
 .../sdk/store/CarbonStoreFactory.java           |  93 ++++++++
 .../sdk/store/DistributedCarbonStore.java       | 129 +++++++++++
 .../apache/carbondata/sdk/store/KeyedRow.java   |  45 ++++
 .../apache/carbondata/sdk/store/PrimaryKey.java |  26 +++
 .../org/apache/carbondata/sdk/store/Row.java    |  27 +++
 .../carbondata/sdk/store/conf/StoreConf.java    | 207 +++++++++++++++++
 .../sdk/store/descriptor/LoadDescriptor.java    | 149 ++++++++++++
 .../sdk/store/descriptor/ScanDescriptor.java    | 151 ++++++++++++
 .../sdk/store/descriptor/TableDescriptor.java   | 214 +++++++++++++++++
 .../sdk/store/descriptor/TableIdentifier.java   |  63 +++++
 .../sdk/store/exception/CarbonException.java    |  38 +++
 .../exception/ExecutionTimeoutException.java    |  27 +++
 .../sdk/store/exception/SchedulerException.java |  31 +++
 .../sdk/store/service/ServiceFactory.java       |  41 ++++
 .../sdk/store/service/StoreService.java         |  53 +++++
 .../carbondata/sdk/store/util/StoreUtil.java    | 134 +++++++++++
 .../rest/controller/SqlHorizonController.java   |   8 +-
 .../rest/model/validate/RequestValidator.java   |   8 +-
 126 files changed, 4912 insertions(+), 3015 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceStability.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceStability.java b/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceStability.java
index 5435028..afd863f 100644
--- a/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceStability.java
+++ b/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceStability.java
@@ -42,7 +42,7 @@ import org.apache.carbondata.common.annotations.InterfaceAudience.*;
  * </ul>
  */
 @InterfaceAudience.User
-@org.apache.hadoop.classification.InterfaceStability.Evolving
+@InterfaceStability.Evolving
 public class InterfaceStability {
   /**
    * Can evolve while retaining compatibility for minor release boundaries.;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
index 48775d4..1f1f087 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
@@ -17,13 +17,21 @@
 
 package org.apache.carbondata.core.datastore.row;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
 
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+import org.apache.carbondata.core.util.ObjectSerializationUtil;
+
+import org.apache.hadoop.io.WritableUtils;
+
 /**
  * This row class is used to transfer the row data from one step to other step
  */
-public class CarbonRow implements Serializable {
+public class CarbonRow implements Serializable, Writable {
 
   private Object[] data;
 
@@ -87,4 +95,24 @@ public class CarbonRow implements Serializable {
   public void clearData() {
     this.data = null;
   }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeCompressedByteArray(out, ObjectSerializationUtil.serialize(data));
+    WritableUtils.writeCompressedByteArray(out, ObjectSerializationUtil.serialize(rawData));
+    out.writeShort(rangeId);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    try {
+      data = (Object[]) ObjectSerializationUtil.deserialize(
+          WritableUtils.readCompressedByteArray(in));
+      rawData = (Object[]) ObjectSerializationUtil.deserialize(
+          WritableUtils.readCompressedByteArray(in));
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+    rangeId = in.readShort();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index 46328f7..abe1810 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -44,7 +44,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTable
  * Store the information about the table.
  * it stores the fact table as well as aggregate table present in the schema
  */
-public class TableInfo implements Serializable, Writable {
+public class TableInfo implements Serializable, Writable, org.apache.hadoop.io.Writable {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(TableInfo.class.getName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 405ff53..dd3e63f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.internal.index.Block;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
@@ -56,6 +57,11 @@ public class CarbonInputSplit extends FileSplit
 
   private Segment segment;
 
+  // We use this filePath to store the block location instead of the
+  // filePath in FileSplit, because filePath in FileSplit is not Serializable
+  // before Hadoop 3, see HADOOP-13519
+  private String filePath;
+
   private String bucketId;
 
   private String blockletId;
@@ -98,6 +104,7 @@ public class CarbonInputSplit extends FileSplit
     numberOfBlocklets = 0;
     invalidSegments = new ArrayList<>();
     version = CarbonProperties.getInstance().getFormatVersion();
+    filePath = "";
   }
 
   private CarbonInputSplit(String segmentId, String blockletId, Path path, long start, long length,
@@ -116,6 +123,7 @@ public class CarbonInputSplit extends FileSplit
     this.version = version;
     this.deleteDeltaFiles = deleteDeltaFiles;
     this.dataMapWritePath = dataMapWritePath;
+    this.filePath = path.toString();
   }
 
   public CarbonInputSplit(String segmentId, String blockletId, Path path, long start, long length,
@@ -136,6 +144,7 @@ public class CarbonInputSplit extends FileSplit
     numberOfBlocklets = 0;
     invalidSegments = new ArrayList<>();
     version = CarbonProperties.getInstance().getFormatVersion();
+    filePath = path.toString();
   }
 
   public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
@@ -149,6 +158,7 @@ public class CarbonInputSplit extends FileSplit
     numberOfBlocklets = 0;
     invalidSegments = new ArrayList<>();
     version = CarbonProperties.getInstance().getFormatVersion();
+    filePath = path.toString();
   }
 
   /**
@@ -252,10 +262,24 @@ public class CarbonInputSplit extends FileSplit
     if (dataMapWriterPathExists) {
       dataMapWritePath = in.readUTF();
     }
+    boolean filePathExists = in.readBoolean();
+    if (filePathExists) {
+      filePath = in.readUTF();
+    } else {
+      filePath = super.getPath().toString();
+    }
   }
 
   @Override public void write(DataOutput out) throws IOException {
-    super.write(out);
+    if (super.getPath() != null) {
+      super.write(out);
+    } else {
+      // see HADOOP-13519, after Java deserialization, super.filePath is
+      // null, so write our filePath instead
+      Text.writeString(out, filePath);
+      out.writeLong(getStart());
+      out.writeLong(getLength());
+    }
     out.writeUTF(segment.toString());
     out.writeShort(version.number());
     out.writeUTF(bucketId);
@@ -278,6 +302,10 @@ public class CarbonInputSplit extends FileSplit
     if (dataMapWritePath != null) {
       out.writeUTF(dataMapWritePath);
     }
+    out.writeBoolean(filePath != null);
+    if (filePath != null) {
+      out.writeUTF(filePath);
+    }
   }
 
   public List<String> getInvalidSegments() {
@@ -398,7 +426,7 @@ public class CarbonInputSplit extends FileSplit
   }
 
   @Override public String getBlockPath() {
-    return getPath().getName();
+    return filePath.substring(filePath.lastIndexOf("/") + 1);
   }
 
   @Override public List<Long> getMatchedBlocklets() {
@@ -444,4 +472,16 @@ public class CarbonInputSplit extends FileSplit
   public Blocklet makeBlocklet() {
     return new Blocklet(getPath().getName(), blockletId);
   }
+
+  public String[] preferredLocations() {
+    if (CarbonProperties.isTaskLocality()) {
+      try {
+        return getLocations();
+      } catch (IOException e) {
+        return new String[0];
+      }
+    } else {
+      return new String[0];
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 70c530f..ce0dc72 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -185,9 +185,8 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   /**
    * It sets unresolved filter expression.
    *
-   * @param configuration
-   * @para    DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
-m filterExpression
+   * @param configuration Hadoop conf
+   * @param filterExpression filter expression
    */
   public static void setFilterPredicates(Configuration configuration, Expression filterExpression) {
     if (filterExpression == null) {
@@ -245,6 +244,17 @@ m filterExpression
     return configuration.get(COLUMN_PROJECTION);
   }
 
+  public static String[] getProjectionColumns(Configuration configuration) {
+    String projectionString = getColumnProjection(configuration);
+    String[] projectColumns;
+    if (projectionString != null) {
+      projectColumns = projectionString.split(",");
+    } else {
+      projectColumns = new String[]{};
+    }
+    return projectColumns;
+  }
+
   public static void setFgDataMapPruning(Configuration configuration, boolean enable) {
     configuration.set(FGDATAMAP_PRUNING, String.valueOf(enable));
   }
@@ -353,7 +363,7 @@ m filterExpression
    */
   @Override public abstract List<InputSplit> getSplits(JobContext job) throws IOException;
 
-  protected Expression getFilterPredicates(Configuration configuration) {
+  public static Expression getFilterPredicates(Configuration configuration) {
     try {
       String filterExprString = configuration.get(FILTER_PREDICATE);
       if (filterExprString == null) {
@@ -524,7 +534,7 @@ m filterExpression
     return prunedBlocklets;
   }
 
-  private List<ExtendedBlocklet> getPrunedFiles4ExternalFormat(JobContext job,
+  public List<ExtendedBlocklet> getPrunedFiles4ExternalFormat(JobContext job,
       CarbonTable carbonTable,
       FilterResolverIntf resolver, List<Segment> segmentIds) throws IOException {
     ExplainCollector.addPruningInfo(carbonTable.getTableName());
@@ -664,13 +674,7 @@ m filterExpression
     CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
 
     // set projection column in the query model
-    String projectionString = getColumnProjection(configuration);
-    String[] projectColumns;
-    if (projectionString != null) {
-      projectColumns = projectionString.split(",");
-    } else {
-      projectColumns = new String[]{};
-    }
+    String[] projectColumns = getProjectionColumns(configuration);
     QueryModel queryModel = new QueryModelBuilder(carbonTable)
         .projectColumns(projectColumns)
         .filterExpression(getFilterPredicates(configuration))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 84e36e3..267ba0f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -71,7 +71,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
 
 /**
  * InputFormat for reading carbondata files with table level metadata support,
@@ -98,6 +100,15 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
   private CarbonTable carbonTable;
   private ReadCommittedScope readCommittedScope;
 
+  public CarbonTableInputFormat() {
+  }
+
+  public CarbonTableInputFormat(Configuration conf) throws IOException {
+    this.carbonTable = getOrCreateCarbonTable(conf);
+    this.readCommittedScope = getReadCommitted(
+        new JobContextImpl(conf, new JobID()), carbonTable.getAbsoluteTableIdentifier());
+  }
+
   /**
    * Get the cached CarbonTable or create it by TableInfo in `configuration`
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
index c126e95..78a2e7f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
@@ -17,6 +17,7 @@
 package org.apache.carbondata.hadoop.readsupport;
 
 import java.io.IOException;
+import java.io.Serializable;
 
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
@@ -24,7 +25,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 /**
  * This is the interface to convert data reading from RecordReader to row representation.
  */
-public interface CarbonReadSupport<T> {
+public interface CarbonReadSupport<T> extends Serializable {
 
   /**
    * Initialization if needed based on the projected column list

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/integration/spark2/src/main/scala/org/apache/carbondata/store/WorkerManager.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/store/WorkerManager.scala b/integration/spark2/src/main/scala/org/apache/carbondata/store/WorkerManager.scala
index 7fff2e5..c624702 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/store/WorkerManager.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/store/WorkerManager.scala
@@ -25,9 +25,9 @@ import org.apache.spark.sql.SparkSession
 import org.apache.carbondata.common.annotations.InterfaceAudience
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.sdk.store.CarbonStoreFactory
+import org.apache.carbondata.sdk.store.conf.StoreConf
 import org.apache.carbondata.spark.util.Util
-import org.apache.carbondata.store.api.CarbonStoreFactory
-import org.apache.carbondata.store.api.conf.StoreConf
 import org.apache.carbondata.store.impl.worker.Worker
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 6c13955..3fccfef 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -22,9 +22,7 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.JavaConverters._
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.SparkContext
 import org.apache.spark.sql.SparkSession.Builder
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}
@@ -35,18 +33,16 @@ import org.apache.spark.sql.hive.execution.command.CarbonSetCommand
 import org.apache.spark.sql.internal.{SessionState, SharedState}
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.profiler.{Profiler, SQLStart}
-import org.apache.spark.util.{CarbonReflectionUtils, Utils}
+import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.common.annotations.InterfaceAudience
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, ThreadLocalSessionInfo}
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.sdk.store.{CarbonStore, CarbonStoreFactory}
+import org.apache.carbondata.sdk.store.conf.StoreConf
+import org.apache.carbondata.sdk.store.descriptor.{ScanDescriptor, TableIdentifier}
 import org.apache.carbondata.store.WorkerManager
-import org.apache.carbondata.store.api.{CarbonStore, CarbonStoreFactory}
-import org.apache.carbondata.store.api.conf.StoreConf
-import org.apache.carbondata.store.api.descriptor.{SelectDescriptor, TableIdentifier => CTableIdentifier}
-import org.apache.carbondata.streaming.CarbonStreamingQueryListener
 
 /**
  * Session implementation for {org.apache.spark.sql.SparkSession}
@@ -204,7 +200,7 @@ class CarbonSession(@transient val sc: SparkContext,
       val storeConf = new StoreConf()
       storeConf.conf(StoreConf.STORE_LOCATION, CarbonProperties.getStorePath)
       storeConf.conf(StoreConf.MASTER_HOST, InetAddress.getLocalHost.getHostAddress)
-      storeConf.conf(StoreConf.MASTER_PORT, CarbonProperties.getSearchMasterPort)
+      storeConf.conf(StoreConf.REGISTRY_PORT, CarbonProperties.getSearchMasterPort)
       storeConf.conf(StoreConf.WORKER_HOST, InetAddress.getLocalHost.getHostAddress)
       storeConf.conf(StoreConf.WORKER_PORT, CarbonProperties.getSearchWorkerPort)
       storeConf.conf(StoreConf.WORKER_CORE_NUM, 2)
@@ -241,13 +237,13 @@ class CarbonSession(@transient val sc: SparkContext,
       maxRows: Option[Long] = None,
       localMaxRows: Option[Long] = None): DataFrame = {
     val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
-    val select = new SelectDescriptor(
-      new CTableIdentifier(table.getTableName, table.getDatabaseName),
+    val select = new ScanDescriptor(
+      new TableIdentifier(table.getTableName, table.getDatabaseName),
       columns.map(_.name).toArray,
       if (expr != null) CarbonFilters.transformExpression(expr) else null,
       localMaxRows.getOrElse(Long.MaxValue)
     )
-    val rows = store.select(select).iterator()
+    val rows = store.scan(select).iterator()
     val output = new java.util.ArrayList[Row]()
     val maxRowCount = maxRows.getOrElse(Long.MaxValue)
     var rowCount = 0

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java
----------------------------------------------------------------------
diff --git a/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java b/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java
index 4b169f4..1f1ce65 100644
--- a/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java
+++ b/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java
@@ -76,7 +76,7 @@ public class TestCarbonResponse {
             "    \"timestamp\": 1531884083849,\n" +
             "    \"status\": 500,\n" +
             "    \"error\": \"Internal Server Error\",\n" +
-            "    \"exception\": \"org.apache.carbondata.store.api.exception.StoreException\",\n" +
+            "    \"exception\": \"org.apache.carbondata.store.api.exception.CarbonException\",\n" +
             "    \"message\": \"org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: " +
             "Table or view 'sinka6' already exists in database 'default';\",\n" +
             "    \"path\": \"/table/sql\"\n" +
@@ -84,7 +84,7 @@ public class TestCarbonResponse {
     CarbonResponse errorResponse = CarbonResponse.parse(new ByteArrayInputStream(input.getBytes())).get();
     assertEquals("org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: " +
             "Table or view 'sinka6' already exists in database 'default';", errorResponse.getMessage());
-    assertEquals("org.apache.carbondata.store.api.exception.StoreException", errorResponse.getException());
+    assertEquals("org.apache.carbondata.store.api.exception.CarbonException", errorResponse.getException());
     assertEquals(1531884083849L, errorResponse.getTimestamp());
     assertEquals("Internal Server Error", errorResponse.getError());
     assertEquals(500, errorResponse.getStatus());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/conf/store.conf
----------------------------------------------------------------------
diff --git a/store/conf/store.conf b/store/conf/store.conf
index 7f18076..9902061 100644
--- a/store/conf/store.conf
+++ b/store/conf/store.conf
@@ -6,5 +6,7 @@ carbon.store.temp.location=/tmp/carbon.store.temp
 
 # worker and master
 carbon.master.host=127.0.0.1
-carbon.master.port=10020
+carbon.master.registry.port=10020
+carbon.master.prune.port=10120
+carbon.master.store.port=9020
 carbon.store.location=/tmp/carbon.store
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/pom.xml
----------------------------------------------------------------------
diff --git a/store/core/pom.xml b/store/core/pom.xml
index 44d5ab1..c9e498d 100644
--- a/store/core/pom.xml
+++ b/store/core/pom.xml
@@ -48,8 +48,8 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-compiler-plugin</artifactId>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.8</source>
+          <target>1.8</target>
         </configuration>
       </plugin>
       <plugin>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStore.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStore.java
deleted file mode 100644
index 3525389..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStore.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api;
-
-import java.io.Closeable;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.annotations.InterfaceStability;
-
-/**
- * Public Interface of CarbonStore
- */
-@InterfaceAudience.User
-@InterfaceStability.Unstable
-public interface CarbonStore extends MetaStore, DataStore, Closeable {
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStoreFactory.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStoreFactory.java b/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStoreFactory.java
deleted file mode 100644
index 76ef450..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStoreFactory.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.carbondata.store.api.conf.StoreConf;
-import org.apache.carbondata.store.api.exception.StoreException;
-
-public class CarbonStoreFactory {
-  private static Map<String, CarbonStore> distributedStores = new ConcurrentHashMap<>();
-  private static Map<String, CarbonStore> localStores = new ConcurrentHashMap<>();
-
-  private CarbonStoreFactory() {
-  }
-
-  public static CarbonStore getDistributedStore(String storeName, StoreConf storeConf)
-      throws StoreException {
-    if (distributedStores.containsKey(storeName)) {
-      return distributedStores.get(storeName);
-    }
-
-    // create a new instance
-    try {
-      String className = "org.apache.carbondata.store.impl.DistributedCarbonStore";
-      CarbonStore store = createCarbonStore(storeConf, className);
-      distributedStores.put(storeName, store);
-      return store;
-    } catch (ClassNotFoundException | IllegalAccessException | InvocationTargetException |
-        InstantiationException e) {
-      throw new StoreException(e);
-    }
-  }
-
-  public static void removeDistributedStore(String storeName) throws IOException {
-    if (distributedStores.containsKey(storeName)) {
-      distributedStores.get(storeName).close();
-      distributedStores.remove(storeName);
-    }
-  }
-
-  public static CarbonStore getLocalStore(String storeName, StoreConf storeConf)
-      throws StoreException {
-    if (localStores.containsKey(storeName)) {
-      return localStores.get(storeName);
-    }
-
-    // create a new instance
-    try {
-      String className = "org.apache.carbondata.store.impl.LocalCarbonStore";
-      CarbonStore store = createCarbonStore(storeConf, className);
-      localStores.put(storeName, store);
-      return store;
-    } catch (ClassNotFoundException | IllegalAccessException | InvocationTargetException |
-        InstantiationException e) {
-      throw new StoreException(e);
-    }
-  }
-
-  public static void removeLocalStore(String storeName) throws IOException {
-    if (localStores.containsKey(storeName)) {
-      localStores.get(storeName).close();
-      localStores.remove(storeName);
-    }
-  }
-
-  private static CarbonStore createCarbonStore(StoreConf storeConf, String className)
-      throws ClassNotFoundException, InstantiationException, IllegalAccessException,
-      InvocationTargetException {
-    Constructor[] constructor = Class.forName(className).getDeclaredConstructors();
-    constructor[0].setAccessible(true);
-    return (CarbonStore) constructor[0].newInstance(storeConf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/DataStore.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/DataStore.java b/store/core/src/main/java/org/apache/carbondata/store/api/DataStore.java
deleted file mode 100644
index d35c133..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/DataStore.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.annotations.InterfaceStability;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.store.api.descriptor.LoadDescriptor;
-import org.apache.carbondata.store.api.descriptor.SelectDescriptor;
-import org.apache.carbondata.store.api.exception.StoreException;
-
-/**
- * Public interface to write and read data in CarbonStore
- */
-@InterfaceAudience.User
-@InterfaceStability.Unstable
-public interface DataStore {
-
-  /**
-   * Load data into a Table
-   * @param load descriptor for load operation
-   * @throws IOException if network or disk IO error occurs
-   */
-  void loadData(LoadDescriptor load) throws IOException, StoreException;
-
-  /**
-   * Scan a Table and return matched rows
-   * @param select descriptor for scan operation, including required column, filter, etc
-   * @return matched rows
-   * @throws IOException if network or disk IO error occurs
-   */
-  List<CarbonRow> select(SelectDescriptor select) throws IOException, StoreException;
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/MetaStore.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/MetaStore.java b/store/core/src/main/java/org/apache/carbondata/store/api/MetaStore.java
deleted file mode 100644
index dea6873..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/MetaStore.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api;
-
-import java.io.IOException;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.annotations.InterfaceStability;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.store.api.descriptor.TableDescriptor;
-import org.apache.carbondata.store.api.descriptor.TableIdentifier;
-import org.apache.carbondata.store.api.exception.StoreException;
-
-/**
- * Public interface to manage table in CarbonStore
- */
-@InterfaceAudience.User
-@InterfaceStability.Unstable
-public interface MetaStore {
-  /**
-   * Create a Table
-   * @param table descriptor for create table operation
-   * @throws IOException if network or disk IO error occurs
-   */
-  void createTable(TableDescriptor table) throws IOException, StoreException;
-
-  /**
-   * Drop a Table, and remove all data in it
-   * @param table table identifier
-   * @throws IOException if network or disk IO error occurs
-   */
-  void dropTable(TableIdentifier table) throws IOException;
-
-  CarbonTable getTable(TableIdentifier table) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/SqlStore.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/SqlStore.java b/store/core/src/main/java/org/apache/carbondata/store/api/SqlStore.java
deleted file mode 100644
index 3f52eed..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/SqlStore.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-
-public interface SqlStore {
-
-  /**
-   * Executor a SQL statement
-   * @param sqlString SQL statement
-   * @return matched rows
-   * @throws IOException if network or disk IO error occurs
-   */
-  List<CarbonRow> sql(String sqlString) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java b/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java
deleted file mode 100644
index 5e4bb4a..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api.conf;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.store.util.StoreUtil;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-public class StoreConf implements Serializable, Writable {
-
-  public static final String SELECT_PROJECTION = "carbon.select.projection";
-  public static final String SELECT_FILTER = "carbon.select.filter";
-  public static final String SELECT_LIMIT = "carbon.select.limit";
-
-  public static final String SELECT_ID = "carbon.select.id";
-
-  public static final String WORKER_HOST = "carbon.worker.host";
-  public static final String WORKER_PORT = "carbon.worker.port";
-  public static final String WORKER_CORE_NUM = "carbon.worker.core.num";
-  public static final String MASTER_HOST = "carbon.master.host";
-  public static final String MASTER_PORT = "carbon.master.port";
-
-  public static final String STORE_TEMP_LOCATION = "carbon.store.temp.location";
-  public static final String STORE_LOCATION = "carbon.store.location";
-  public static final String STORE_NAME = "carbon.store.name";
-
-  public static final String STORE_CONF_FILE = "carbon.store.confFile";
-
-  private Map<String, String> conf = new HashMap<>();
-
-  public StoreConf() {
-    String storeConfFile = System.getProperty(STORE_CONF_FILE);
-    if (storeConfFile != null) {
-      load(storeConfFile);
-    }
-  }
-
-  public StoreConf(String storeName, String storeLocation) {
-    conf.put(STORE_NAME, storeName);
-    conf.put(STORE_LOCATION, storeLocation);
-  }
-
-  public StoreConf(String confFilePath) {
-    load(confFilePath);
-  }
-
-  public StoreConf conf(String key, String value) {
-    conf.put(key, value);
-    return this;
-  }
-
-  public StoreConf conf(String key, int value) {
-    conf.put(key, "" + value);
-    return this;
-  }
-
-  public void load(String filePath) {
-    StoreUtil.loadProperties(filePath, this);
-  }
-
-  public void conf(StoreConf conf) {
-    this.conf.putAll(conf.conf);
-  }
-
-  public Object conf(String key) {
-    return conf.get(key);
-  }
-
-  public String[] projection() {
-    return stringArrayValue(SELECT_PROJECTION);
-  }
-
-  public String filter() {
-    return stringValue(SELECT_FILTER);
-  }
-
-  public int limit() {
-    return intValue(SELECT_LIMIT);
-  }
-
-  public String masterHost() {
-    return stringValue(MASTER_HOST);
-  }
-
-  public int masterPort() {
-    return intValue(MASTER_PORT);
-  }
-
-  public String workerHost() {
-    return stringValue(WORKER_HOST);
-  }
-
-  public int workerPort() {
-    return intValue(WORKER_PORT);
-  }
-
-  public int workerCoreNum() {
-    return intValue(WORKER_CORE_NUM);
-  }
-
-  public String storeLocation() {
-    return stringValue(STORE_LOCATION);
-  }
-
-  public String[] storeTempLocation() {
-    return stringArrayValue(STORE_TEMP_LOCATION);
-  }
-
-  public String selectId() {
-    return stringValue(SELECT_ID);
-  }
-
-  public Configuration newHadoopConf() {
-    Configuration hadoopConf = FileFactory.getConfiguration();
-    for (Map.Entry<String, String> entry : conf.entrySet()) {
-      String key = entry.getKey();
-      String value = entry.getValue();
-      if (key != null && value != null && key.startsWith("carbon.hadoop.")) {
-        hadoopConf.set(key.substring("carbon.hadoop.".length()), value);
-      }
-    }
-    return hadoopConf;
-  }
-
-  private String stringValue(String key) {
-    Object obj = conf.get(key);
-    if (obj == null) {
-      return null;
-    }
-    return obj.toString();
-  }
-
-  private int intValue(String key) {
-    String value = conf.get(key);
-    if (value == null) {
-      return -1;
-    }
-    return Integer.parseInt(value);
-  }
-
-  private String[] stringArrayValue(String key) {
-    String value = conf.get(key);
-    if (value == null) {
-      return null;
-    }
-    return value.split(",", -1);
-  }
-
-  @Override public void write(DataOutput out) throws IOException {
-    Set<Map.Entry<String, String>> entries = conf.entrySet();
-    WritableUtils.writeVInt(out, conf.size());
-    for (Map.Entry<String, String> entry : entries) {
-      WritableUtils.writeString(out, entry.getKey());
-      WritableUtils.writeString(out, entry.getValue());
-    }
-  }
-
-  @Override public void readFields(DataInput in) throws IOException {
-    if (conf == null) {
-      conf = new HashMap<>();
-    }
-
-    int size = WritableUtils.readVInt(in);
-    String key, value;
-    for (int i = 0; i < size; i++) {
-      key = WritableUtils.readString(in);
-      value = WritableUtils.readString(in);
-      conf.put(key, value);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/LoadDescriptor.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/LoadDescriptor.java b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/LoadDescriptor.java
deleted file mode 100644
index c3a4ff7..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/LoadDescriptor.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api.descriptor;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-
-public class LoadDescriptor {
-
-  private TableIdentifier table;
-  private String inputPath;
-  private Map<String, String> options;
-  private boolean isOverwrite;
-
-  private LoadDescriptor() {
-  }
-
-  public LoadDescriptor(TableIdentifier table, String inputPath,
-      Map<String, String> options, boolean isOverwrite) {
-    Objects.requireNonNull(table);
-    Objects.requireNonNull(inputPath);
-    this.table = table;
-    this.inputPath = inputPath;
-    this.options = options;
-    this.isOverwrite = isOverwrite;
-  }
-
-  public TableIdentifier getTable() {
-    return table;
-  }
-
-  public void setTable(TableIdentifier table) {
-    this.table = table;
-  }
-
-  public String getInputPath() {
-    return inputPath;
-  }
-
-  public void setInputPath(String inputPath) {
-    this.inputPath = inputPath;
-  }
-
-  public Map<String, String> getOptions() {
-    return options;
-  }
-
-  public void setOptions(Map<String, String> options) {
-    this.options = options;
-  }
-
-  public boolean isOverwrite() {
-    return isOverwrite;
-  }
-
-  public void setOverwrite(boolean overwrite) {
-    isOverwrite = overwrite;
-  }
-
-  public static class Builder {
-    private LoadDescriptor load;
-    private Map<String, String> options;
-
-    private Builder() {
-      load = new LoadDescriptor();
-      options = new HashMap<>();
-    }
-
-    public Builder table(TableIdentifier tableIdentifier) {
-      load.setTable(tableIdentifier);
-      return this;
-    }
-
-    public Builder overwrite(boolean isOverwrite) {
-      load.setOverwrite(isOverwrite);
-      return this;
-    }
-
-    public Builder inputPath(String inputPath) {
-      load.setInputPath(inputPath);
-      return this;
-    }
-
-    public Builder options(String key, String value) {
-      options.put(key, value);
-      return this;
-    }
-
-    public LoadDescriptor create() {
-      load.setOptions(options);
-      return load;
-    }
-  }
-
-  public static Builder builder() {
-    return new Builder();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/SelectDescriptor.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/SelectDescriptor.java b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/SelectDescriptor.java
deleted file mode 100644
index c3627a9..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/SelectDescriptor.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api.descriptor;
-
-import java.util.Objects;
-
-import org.apache.carbondata.core.scan.expression.Expression;
-
-public class SelectDescriptor {
-
-  private TableIdentifier table;
-  private String[] projection;
-  private Expression filter;
-  private long limit;
-
-  private SelectDescriptor() {
-  }
-
-  public SelectDescriptor(TableIdentifier table, String[] projection,
-      Expression filter, long limit) {
-    Objects.requireNonNull(table);
-    Objects.requireNonNull(projection);
-    this.table = table;
-    this.projection = projection;
-    this.filter = filter;
-    this.limit = limit;
-  }
-
-  public TableIdentifier getTable() {
-    return table;
-  }
-
-  public void setTable(TableIdentifier table) {
-    this.table = table;
-  }
-
-  public String[] getProjection() {
-    return projection;
-  }
-
-  public void setProjection(String[] projection) {
-    this.projection = projection;
-  }
-
-  public Expression getFilter() {
-    return filter;
-  }
-
-  public void setFilter(Expression filter) {
-    this.filter = filter;
-  }
-
-  public long getLimit() {
-    return limit;
-  }
-
-  public void setLimit(long limit) {
-    this.limit = limit;
-  }
-
-  public static class Builder {
-    private SelectDescriptor select;
-
-    private Builder() {
-      select = new SelectDescriptor();
-    }
-
-    public Builder table(TableIdentifier tableIdentifier) {
-      select.setTable(tableIdentifier);
-      return this;
-    }
-
-    public Builder select(String... columnNames) {
-      select.setProjection(columnNames);
-      return this;
-    }
-
-    public Builder filter(Expression filter) {
-      select.setFilter(filter);
-      return this;
-    }
-
-    public Builder limit(long limit) {
-      select.setLimit(limit);
-      return this;
-    }
-
-    public SelectDescriptor create() {
-      return select;
-    }
-  }
-
-  public static Builder builder() {
-    return new Builder();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableDescriptor.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableDescriptor.java b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableDescriptor.java
deleted file mode 100644
index 2d677a8..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableDescriptor.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api.descriptor;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.sdk.file.Field;
-import org.apache.carbondata.sdk.file.Schema;
-
-public class TableDescriptor {
-
-  private boolean ifNotExists;
-  private TableIdentifier table;
-  private String tablePath;
-  private Schema schema;
-  private Map<String, String> properties;
-  private String comment;
-
-  private TableDescriptor() {
-  }
-
-  public TableDescriptor(TableIdentifier table, Schema schema,
-      Map<String, String> properties, String tablePath, String comment, boolean ifNotExists) {
-    Objects.requireNonNull(table);
-    Objects.requireNonNull(schema);
-    this.table = table;
-    this.ifNotExists = ifNotExists;
-    this.schema = schema;
-    this.properties = properties;
-    this.tablePath = tablePath;
-    this.comment = comment;
-  }
-
-  public boolean isIfNotExists() {
-    return ifNotExists;
-  }
-
-  public void setIfNotExists(boolean ifNotExists) {
-    this.ifNotExists = ifNotExists;
-  }
-
-  public TableIdentifier getTable() {
-    return table;
-  }
-
-  public void setTable(TableIdentifier table) {
-    this.table = table;
-  }
-
-  public Schema getSchema() {
-    return schema;
-  }
-
-  public void setSchema(Schema schema) {
-    this.schema = schema;
-  }
-
-  public Map<String, String> getProperties() {
-    return properties;
-  }
-
-  public void setProperties(Map<String, String> properties) {
-    this.properties = properties;
-  }
-
-  public String getComment() {
-    return comment;
-  }
-
-  public void setComment(String comment) {
-    this.comment = comment;
-  }
-
-  public void setTablePath(String tablePath) {
-    this.tablePath = tablePath;
-  }
-
-  public String getTablePath() {
-    return tablePath;
-  }
-
-  public static class Builder {
-
-    private TableDescriptor table;
-    private List<Field> fields;
-    private Map<String, String> tblProperties;
-
-    private Builder() {
-      table = new TableDescriptor();
-      fields = new ArrayList<>();
-      tblProperties = new HashMap<>();
-    }
-
-    public Builder ifNotExists() {
-      table.setIfNotExists(true);
-      return this;
-    }
-
-    public Builder table(TableIdentifier tableId) {
-      table.setTable(tableId);
-      return this;
-    }
-
-    public Builder tablePath(String tablePath) {
-      table.setTablePath(tablePath);
-      return this;
-    }
-
-    public Builder comment(String tableComment) {
-      table.setComment(tableComment);
-      return this;
-    }
-
-    public Builder column(String name, DataType dataType) {
-      fields.add(new Field(name, dataType));
-      return this;
-    }
-
-    public Builder column(String name, DataType dataType, String comment) {
-      Field field = new Field(name, dataType);
-      field.setColumnComment(comment);
-      fields.add(field);
-      return this;
-    }
-
-    public Builder column(String name, DataType dataType, int precision, int scale, String comment)
-    {
-      Field field = new Field(name, dataType);
-      field.setColumnComment(comment);
-      field.setScale(scale);
-      field.setPrecision(precision);
-      fields.add(field);
-      return this;
-    }
-
-    public Builder tblProperties(String key, String value) {
-      tblProperties.put(key, value);
-      return this;
-    }
-
-    public TableDescriptor create() {
-      Field[] fieldArray = new Field[fields.size()];
-      fieldArray = fields.toArray(fieldArray);
-      Schema schema = new Schema(fieldArray);
-      table.setSchema(schema);
-      table.setProperties(tblProperties);
-      return table;
-    }
-  }
-
-  public static Builder builder() {
-    return new Builder();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableIdentifier.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableIdentifier.java b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableIdentifier.java
deleted file mode 100644
index ab8edf8..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableIdentifier.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api.descriptor;
-
-public class TableIdentifier {
-  private String tableName;
-  private String databaseName;
-
-  public TableIdentifier(String tableName, String databaseName) {
-    this.tableName = tableName;
-    this.databaseName = databaseName;
-  }
-
-  public String getTableName() {
-    return tableName;
-  }
-
-  public String getDatabaseName() {
-    return databaseName;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/exception/ExecutionTimeoutException.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/exception/ExecutionTimeoutException.java b/store/core/src/main/java/org/apache/carbondata/store/api/exception/ExecutionTimeoutException.java
deleted file mode 100644
index 728837d..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/exception/ExecutionTimeoutException.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api.exception;
-
-public class ExecutionTimeoutException extends RuntimeException {
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/exception/SchedulerException.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/exception/SchedulerException.java b/store/core/src/main/java/org/apache/carbondata/store/api/exception/SchedulerException.java
deleted file mode 100644
index 28b8a50..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/exception/SchedulerException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api.exception;
-
-public class SchedulerException extends RuntimeException {
-
-  public SchedulerException(String message) {
-    super(message);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/exception/StoreException.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/exception/StoreException.java b/store/core/src/main/java/org/apache/carbondata/store/api/exception/StoreException.java
deleted file mode 100644
index 315a09b..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/exception/StoreException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api.exception;
-
-public class StoreException extends Exception {
-
-  public StoreException() {
-    super();
-  }
-
-  public StoreException(String message) {
-    super(message);
-  }
-
-  public StoreException(Exception e) {
-    super(e);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/DataLoader.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/DataLoader.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/DataLoader.java
new file mode 100644
index 0000000..4b79ee4
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/DataLoader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.devapi;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.metadata.datatype.StructType;
+import org.apache.carbondata.sdk.store.Row;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+
+/**
+ * A Loader is used to load data from files to the table
+ */
+@InterfaceAudience.Developer("Integration")
+@InterfaceStability.Unstable
+public interface DataLoader extends TransactionalOperation, Serializable {
+  /**
+   * Trigger the load operation
+   * @throws CarbonException if any error occurs
+   */
+  void load() throws CarbonException;
+
+  /**
+   * Append a batch of rows.
+   * @param rows rows to append
+   * @param schema schema of the input row
+   * @throws CarbonException if any error occurs
+   */
+  void append(Iterator<Row> rows, StructType schema) throws CarbonException;
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/DataScanner.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/DataScanner.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/DataScanner.java
new file mode 100644
index 0000000..c6c5628
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/DataScanner.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.devapi;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+
+@InterfaceAudience.Developer("Integration")
+@InterfaceStability.Unstable
+public interface DataScanner<T> extends Serializable {
+
+  /**
+   * Perform a scan in a distributed compute framework like Spark, Presto, etc.
+   * Filter/Projection/Limit operation is pushed down to the scan.
+   *
+   * This should be used with {@link Pruner#prune(TableIdentifier, Expression)}
+   * in a distributed compute environment. It enables the framework to
+   * do a parallel scan by creating multiple {@link ScanUnit} and perform
+   * parallel scan in worker, such as Spark executor
+   *
+   * The return result is in batch so that the caller can start next
+   * level of computation before getting all results, such as
+   * implementing a `prefetch` execution model.
+   *
+   * @param input one scan unit
+   * @return scan result, the result is returned in batch
+   * @throws CarbonException if any error occurs
+   */
+  Iterator<? extends ResultBatch<T>> scan(ScanUnit input) throws CarbonException;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStore.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStore.java
new file mode 100644
index 0000000..01c2008
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStore.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.devapi;
+
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.sdk.store.CarbonStore;
+import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+
+/**
+ * Internal API for engine integration developers
+ */
+@InterfaceAudience.Developer("Integration")
+@InterfaceStability.Unstable
+public interface InternalCarbonStore extends CarbonStore {
+
+  /**
+   * Get CarbonTable object from the store
+   *
+   * @param tableIdentifier table identifier
+   * @return CarbonTable object
+   * @throws CarbonException if any error occurs
+   */
+  CarbonTable getCarbonTable(TableIdentifier tableIdentifier) throws CarbonException;
+
+  /**
+   * Return a new Loader that can be used to load data in distributed compute framework
+   * @param load descriptor for load operation
+   * @return a new Loader
+   * @throws CarbonException if any error occurs
+   */
+  DataLoader newLoader(LoadDescriptor load) throws CarbonException;
+
+  /**
+   * Return a new Scanner that can be used in for parallel scan
+   *
+   * @param tableIdentifier table to scan
+   * @param scanOption options for scan, use {@link ScanOption} for the map key
+   * @param readSupport read support to convert the row to output object
+   * @param <T> the target object type contain in {@link ResultBatch}
+   * @return a new Scanner
+   * @throws CarbonException if any error occurs
+   */
+  <T> Scanner<T> newScanner(
+      TableIdentifier tableIdentifier,
+      ScanDescriptor scanDescriptor,
+      Map<String, String> scanOption,
+      CarbonReadSupport<T> readSupport) throws CarbonException;
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStoreFactory.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStoreFactory.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStoreFactory.java
new file mode 100644
index 0000000..c875aa1
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStoreFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.devapi;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.sdk.store.conf.StoreConf;
+import org.apache.carbondata.store.impl.InternalCarbonStoreImpl;
+
+@InterfaceAudience.Developer("Integration")
+@InterfaceStability.Unstable
+public class InternalCarbonStoreFactory {
+
+  private static final Map<String, InternalCarbonStore> stores = new ConcurrentHashMap<>();
+
+  public static synchronized InternalCarbonStore getStore(String storeName, StoreConf conf)
+      throws IOException {
+    InternalCarbonStore store = stores.getOrDefault(storeName, newStore(conf));
+    stores.putIfAbsent(storeName, store);
+    return store;
+  }
+
+  private static InternalCarbonStore newStore(StoreConf conf) throws IOException {
+    return new InternalCarbonStoreImpl(conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/Pruner.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/Pruner.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/Pruner.java
new file mode 100644
index 0000000..4a1d2e5
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/Pruner.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.devapi;
+
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+
+@InterfaceAudience.Developer("Integration")
+@InterfaceStability.Unstable
+public interface Pruner {
+
+  /**
+   * Return an array of ScanUnit which will be the input in
+   * {@link Scanner#scan(ScanUnit)}
+   *
+   * Implementation will leverage index to prune using specified
+   * filter expression
+   *
+   * @param table table identifier
+   * @param filterExpression expression of filter predicate given by user
+   * @return list of ScanUnit which should be passed to
+   *         {@link Scanner#scan(ScanUnit)}
+   * @throws CarbonException if any error occurs
+   */
+  List<ScanUnit> prune(TableIdentifier table, Expression filterExpression) throws CarbonException;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/ResultBatch.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/ResultBatch.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/ResultBatch.java
new file mode 100644
index 0000000..221e0f6
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/ResultBatch.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.devapi;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+@InterfaceAudience.Developer("Integration")
+@InterfaceStability.Unstable
+public interface ResultBatch<T> {
+
+  /**
+   * Return true if the result is returned in columnar batch, otherwise is row by row.
+   * By default, it is columnar batch.
+   */
+  default boolean isColumnar() {
+    return true;
+  }
+
+  /**
+   * Return true if there is more elements in this batch.
+   */
+  boolean hasNext();
+
+  /**
+   * Return next item.
+   * If {@link #isColumnar()} return true, there is only one element in this batch
+   * which is {@link ColumnarBatch}, otherwise, this batch return row by row, caller
+   * should call next() until no element left.
+   */
+  T next();
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/ScanOption.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/ScanOption.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/ScanOption.java
new file mode 100644
index 0000000..8e5797d
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/ScanOption.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.devapi;
+
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+@InterfaceAudience.Developer("Integration")
+@InterfaceStability.Unstable
+public class ScanOption {
+
+  /** batch size in number of rows in one {@link ResultBatch} */
+  public static final String BATCH_SIZE = "batchSize";
+
+  /**
+   * set to true if return in row major object in {@link ResultBatch},
+   * otherwise columnar object is returned
+   */
+  public static final String ROW_MAJOR = "rowMajor";
+
+  /**
+   * set to true if enable remote prune by RPC call,
+   * otherwise prune executes in caller's JVM
+   */
+  public static final String REMOTE_PRUNE = "remotePrune";
+
+  /**
+   * set to true if enable operator pushdown like scan and load
+   * otherwise operation executes in caller's JVM
+   */
+  public static final String OP_PUSHDOWN = "operatorPushDown";
+
+  /**
+   * Return true if REMOTE_PRUNE is set, default is false
+   */
+  public static boolean isRemotePrune(Map<String, String> options) {
+    if (options == null) {
+      return false;
+    }
+    return Boolean.valueOf(options.getOrDefault(REMOTE_PRUNE, "false"));
+  }
+
+  /**
+   * Return true if REMOTE_PRUNE is set, default is false
+   */
+  public static boolean isOperatorPushdown(Map<String, String> options) {
+    if (options == null) {
+      return false;
+    }
+    return Boolean.valueOf(options.getOrDefault(OP_PUSHDOWN, "false"));
+  }
+}
\ No newline at end of file