You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/08/06 11:04:49 UTC

[2/5] carbondata git commit: [CARBONDATA-2825][CARBONDATA-2828] CarbonStore and InternalCarbonStore API This closes #2589

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/service/model/ShutdownResponse.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/service/model/ShutdownResponse.java b/store/core/src/main/java/org/apache/carbondata/store/impl/service/model/ShutdownResponse.java
new file mode 100644
index 0000000..2650bbf
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/service/model/ShutdownResponse.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl.service.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+import org.apache.hadoop.io.Writable;
+
+@InterfaceAudience.Internal
+public class ShutdownResponse implements Serializable, Writable {
+  private int status;
+  private String message;
+
+  public ShutdownResponse() {
+  }
+
+  public ShutdownResponse(int status, String message) {
+    this.status = status;
+    this.message = message;
+  }
+
+  public int getStatus() {
+    return status;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(status);
+    out.writeUTF(message);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    status = in.readInt();
+    message = in.readUTF();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/DataServiceImpl.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/DataServiceImpl.java b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/DataServiceImpl.java
new file mode 100644
index 0000000..d1b8f43
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/DataServiceImpl.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl.worker;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor;
+import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.processing.loading.DataLoadExecutor;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.sdk.store.util.StoreUtil;
+import org.apache.carbondata.store.impl.DataOperation;
+import org.apache.carbondata.store.impl.Status;
+import org.apache.carbondata.store.impl.service.DataService;
+import org.apache.carbondata.store.impl.service.model.BaseResponse;
+import org.apache.carbondata.store.impl.service.model.LoadDataRequest;
+import org.apache.carbondata.store.impl.service.model.ScanRequest;
+import org.apache.carbondata.store.impl.service.model.ScanResponse;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+@InterfaceAudience.Internal
+public class DataServiceImpl implements DataService {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DataServiceImpl.class.getCanonicalName());
+
+  // temp location for loading (writing sort temp files)
+  private String[] storeTempLocation;
+  private Configuration hadoopConf;
+
+  DataServiceImpl(Worker worker) {
+    this.hadoopConf = worker.getHadoopConf();
+    this.storeTempLocation = worker.getConf().storeTempLocation();
+  }
+
+  @Override
+  public BaseResponse loadData(LoadDataRequest request) {
+    DataLoadExecutor executor = null;
+    try {
+      CarbonLoadModel model = request.getModel();
+
+      JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0);
+      CarbonInputFormatUtil.createJobTrackerID(new Date());
+      TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
+      TaskAttemptID taskAttemptId = new TaskAttemptID(taskId, 0);
+      Configuration configuration = new Configuration(hadoopConf);
+      StoreUtil.configureCSVInputFormat(configuration, model);
+      configuration.set(FileInputFormat.INPUT_DIR, model.getFactFilePath());
+      // Set up the attempt context required to use in the output committer.
+      TaskAttemptContext hadoopAttemptContext =
+          new TaskAttemptContextImpl(configuration, taskAttemptId);
+
+      CSVInputFormat format = new CSVInputFormat();
+      List<InputSplit> splits = format.getSplits(hadoopAttemptContext);
+
+      CarbonIterator<Object[]>[] readerIterators = new CSVRecordReaderIterator[splits.size()];
+      for (int index = 0; index < splits.size(); index++) {
+        readerIterators[index] = new CSVRecordReaderIterator(
+            format.createRecordReader(splits.get(index), hadoopAttemptContext), splits.get(index),
+            hadoopAttemptContext);
+      }
+
+      executor = new DataLoadExecutor();
+      executor.execute(model, storeTempLocation, readerIterators);
+
+      return new BaseResponse(Status.SUCCESS.ordinal(), "");
+    } catch (IOException e) {
+      LOGGER.error(e, "Failed to handle load data");
+      return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage());
+    } catch (InterruptedException e) {
+      LOGGER.error(e, "Interrupted handle load data ");
+      return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage());
+    } catch (Exception e) {
+      LOGGER.error(e, "Failed to execute load data ");
+      return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage());
+    } finally {
+      if (executor != null) {
+        executor.close();
+        StoreUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId());
+      }
+    }
+  }
+
+  @Override
+  public ScanResponse scan(ScanRequest scan) {
+    try {
+      LOGGER.info(String.format("[QueryId:%d] receive search request", scan.getRequestId()));
+      List<CarbonRow> rows = DataOperation.scan(scan.getTableInfo(), scan);
+      LOGGER.info(String.format("[QueryId:%d] sending success response", scan.getRequestId()));
+      return createSuccessResponse(scan, rows);
+    } catch (IOException e) {
+      LOGGER.error(e);
+      LOGGER.info(String.format("[QueryId:%d] sending failure response", scan.getRequestId()));
+      return createFailureResponse(scan, e);
+    }
+  }
+
+  /**
+   * create a failure response
+   */
+  private ScanResponse createFailureResponse(ScanRequest scan, Throwable throwable) {
+    return new ScanResponse(scan.getRequestId(), Status.FAILURE.ordinal(),
+        throwable.getMessage(), new Object[0][]);
+  }
+
+  /**
+   * create a success response with result rows
+   */
+  private ScanResponse createSuccessResponse(ScanRequest scan, List<CarbonRow> rows) {
+    Iterator<CarbonRow> itor = rows.iterator();
+    Object[][] output = new Object[rows.size()][];
+    int i = 0;
+    while (itor.hasNext()) {
+      output[i++] = itor.next().getData();
+    }
+    return new ScanResponse(scan.getRequestId(), Status.SUCCESS.ordinal(), "", output);
+  }
+
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+    return versionID;
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
+      int clientMethodsHash) throws IOException {
+    return null;
+  }
+
+  @Override
+  public void close() throws IOException {
+    LOGGER.info("Shutting down worker...");
+    SearchModeDetailQueryExecutor.shutdownThreadPool();
+    SearchModeVectorDetailQueryExecutor.shutdownThreadPool();
+    LOGGER.info("Worker shut down");
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/RequestHandler.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/RequestHandler.java b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/RequestHandler.java
deleted file mode 100644
index fd13b20..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/RequestHandler.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.impl.worker;
-
-import java.io.IOException;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor;
-import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor;
-import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-import org.apache.carbondata.processing.loading.DataLoadExecutor;
-import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
-import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator;
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.store.api.conf.StoreConf;
-import org.apache.carbondata.store.impl.CarbonStoreBase;
-import org.apache.carbondata.store.impl.Status;
-import org.apache.carbondata.store.impl.rpc.model.BaseResponse;
-import org.apache.carbondata.store.impl.rpc.model.LoadDataRequest;
-import org.apache.carbondata.store.impl.rpc.model.QueryResponse;
-import org.apache.carbondata.store.impl.rpc.model.Scan;
-import org.apache.carbondata.store.impl.rpc.model.ShutdownRequest;
-import org.apache.carbondata.store.impl.rpc.model.ShutdownResponse;
-import org.apache.carbondata.store.util.StoreUtil;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-
-/**
- * It handles request from master.
- */
-@InterfaceAudience.Internal
-class RequestHandler {
-
-  private StoreConf storeConf;
-  private Configuration hadoopConf;
-
-  RequestHandler(StoreConf conf, Configuration hadoopConf) {
-    this.storeConf = conf;
-    this.hadoopConf = hadoopConf;
-  }
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(RequestHandler.class.getName());
-
-  QueryResponse handleScan(Scan scan) {
-    try {
-      LOGGER.info(String.format("[QueryId:%d] receive search request", scan.getRequestId()));
-      CarbonTable table = CarbonTable.buildFromTableInfo(scan.getTableInfo());
-      List<CarbonRow> rows = CarbonStoreBase.scan(table, scan);
-      LOGGER.info(String.format("[QueryId:%d] sending success response", scan.getRequestId()));
-      return createSuccessResponse(scan, rows);
-    } catch (IOException e) {
-      LOGGER.error(e);
-      LOGGER.info(String.format("[QueryId:%d] sending failure response", scan.getRequestId()));
-      return createFailureResponse(scan, e);
-    }
-  }
-
-  ShutdownResponse handleShutdown(ShutdownRequest request) {
-    LOGGER.info("Shutting down worker...");
-    SearchModeDetailQueryExecutor.shutdownThreadPool();
-    SearchModeVectorDetailQueryExecutor.shutdownThreadPool();
-    LOGGER.info("Worker shut down");
-    return new ShutdownResponse(Status.SUCCESS.ordinal(), "");
-  }
-
-  /**
-   * create a failure response
-   */
-  private QueryResponse createFailureResponse(Scan scan, Throwable throwable) {
-    return new QueryResponse(scan.getRequestId(), Status.FAILURE.ordinal(),
-        throwable.getMessage(), new Object[0][]);
-  }
-
-  /**
-   * create a success response with result rows
-   */
-  private QueryResponse createSuccessResponse(Scan scan, List<CarbonRow> rows) {
-    Iterator<CarbonRow> itor = rows.iterator();
-    Object[][] output = new Object[rows.size()][];
-    int i = 0;
-    while (itor.hasNext()) {
-      output[i++] = itor.next().getData();
-    }
-    return new QueryResponse(scan.getRequestId(), Status.SUCCESS.ordinal(), "", output);
-  }
-
-  public BaseResponse handleLoadData(LoadDataRequest request) {
-    DataLoadExecutor executor = null;
-    try {
-      CarbonLoadModel model = request.getModel();
-
-      JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0);
-      CarbonInputFormatUtil.createJobTrackerID(new Date());
-      TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
-      TaskAttemptID taskAttemptId = new TaskAttemptID(taskId, 0);
-      Configuration configuration = new Configuration(hadoopConf);
-      StoreUtil.configureCSVInputFormat(configuration, model);
-      configuration.set(FileInputFormat.INPUT_DIR, model.getFactFilePath());
-      // Set up the attempt context required to use in the output committer.
-      TaskAttemptContext hadoopAttemptContext =
-          new TaskAttemptContextImpl(configuration, taskAttemptId);
-
-      CSVInputFormat format = new CSVInputFormat();
-      List<InputSplit> splits = format.getSplits(hadoopAttemptContext);
-
-      CarbonIterator<Object[]>[] readerIterators = new CSVRecordReaderIterator[splits.size()];
-      for (int index = 0; index < splits.size(); index++) {
-        readerIterators[index] = new CSVRecordReaderIterator(
-            format.createRecordReader(splits.get(index), hadoopAttemptContext), splits.get(index),
-            hadoopAttemptContext);
-      }
-
-      executor = new DataLoadExecutor();
-      executor.execute(model, storeConf.storeTempLocation(), readerIterators);
-
-      return new BaseResponse(Status.SUCCESS.ordinal(), "");
-    } catch (IOException e) {
-      LOGGER.error(e, "Failed to handle load data");
-      return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage());
-    } catch (InterruptedException e) {
-      LOGGER.error(e, "Interrupted handle load data ");
-      return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage());
-    } catch (Exception e) {
-      LOGGER.error(e, "Failed to execute load data ");
-      return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage());
-    } finally {
-      if (executor != null) {
-        executor.close();
-        StoreUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/StoreServiceImpl.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/StoreServiceImpl.java b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/StoreServiceImpl.java
deleted file mode 100644
index 26f252c..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/StoreServiceImpl.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.impl.worker;
-
-import java.io.IOException;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.store.impl.rpc.StoreService;
-import org.apache.carbondata.store.impl.rpc.model.BaseResponse;
-import org.apache.carbondata.store.impl.rpc.model.LoadDataRequest;
-import org.apache.carbondata.store.impl.rpc.model.QueryResponse;
-import org.apache.carbondata.store.impl.rpc.model.Scan;
-import org.apache.carbondata.store.impl.rpc.model.ShutdownRequest;
-import org.apache.carbondata.store.impl.rpc.model.ShutdownResponse;
-
-import org.apache.hadoop.ipc.ProtocolSignature;
-
-@InterfaceAudience.Internal
-public class StoreServiceImpl implements StoreService {
-
-  private Worker worker;
-  RequestHandler handler;
-
-  public StoreServiceImpl(Worker worker) {
-    this.worker = worker;
-    this.handler = new RequestHandler(worker.getConf(), worker.getHadoopConf());
-  }
-
-  @Override
-  public BaseResponse loadData(LoadDataRequest request) {
-    return handler.handleLoadData(request);
-  }
-
-  @Override
-  public QueryResponse query(Scan scan) {
-    return handler.handleScan(scan);
-  }
-
-  @Override
-  public ShutdownResponse shutdown(ShutdownRequest request) {
-    return handler.handleShutdown(request);
-  }
-
-  @Override
-  public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
-    return versionID;
-  }
-
-  @Override
-  public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
-      int clientMethodsHash) throws IOException {
-    return null;
-  }
-
-  public Worker getWorker() {
-    return worker;
-  }
-
-  public void setWorker(Worker worker) {
-    this.worker = worker;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/Worker.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/Worker.java b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/Worker.java
index a360e36..e3546d9 100644
--- a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/Worker.java
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/Worker.java
@@ -23,13 +23,13 @@ import java.net.BindException;
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.store.api.conf.StoreConf;
-import org.apache.carbondata.store.impl.rpc.RegistryService;
-import org.apache.carbondata.store.impl.rpc.ServiceFactory;
-import org.apache.carbondata.store.impl.rpc.StoreService;
-import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerRequest;
-import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerResponse;
-import org.apache.carbondata.store.util.StoreUtil;
+import org.apache.carbondata.sdk.store.conf.StoreConf;
+import org.apache.carbondata.sdk.store.util.StoreUtil;
+import org.apache.carbondata.store.impl.service.DataService;
+import org.apache.carbondata.store.impl.service.RegistryService;
+import org.apache.carbondata.store.impl.service.ServiceFactory;
+import org.apache.carbondata.store.impl.service.model.RegisterWorkerRequest;
+import org.apache.carbondata.store.impl.service.model.RegisterWorkerResponse;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RPC;
@@ -66,15 +66,15 @@ public class Worker {
     int coreNum = conf.workerCoreNum();
     String host = conf.workerHost();
     int port = conf.workerPort();
-    StoreService queryService = new StoreServiceImpl(this);
+    DataService dataService = new DataServiceImpl(this);
     do {
       try {
         server = new RPC.Builder(hadoopConf)
             .setNumHandlers(coreNum)
             .setBindAddress(host)
             .setPort(port)
-            .setProtocol(StoreService.class)
-            .setInstance(queryService)
+            .setProtocol(DataService.class)
+            .setInstance(dataService)
             .build();
         server.start();
 
@@ -116,9 +116,11 @@ public class Worker {
   }
 
   private void registerToMaster() throws IOException {
-    LOGGER.info("trying to register to master " + conf.masterHost() + ":" + conf.masterPort());
+    LOGGER.info("trying to register to master " +
+        conf.masterHost() + ":" + conf.registryServicePort());
     if (registry == null) {
-      registry = ServiceFactory.createRegistryService(conf.masterHost(), conf.masterPort());
+      registry = ServiceFactory.createRegistryService(
+          conf.masterHost(), conf.registryServicePort());
     }
     RegisterWorkerRequest request =
         new RegisterWorkerRequest(conf.workerHost(), conf.workerPort(), conf.workerCoreNum());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java b/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java
deleted file mode 100644
index 775669f..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.util;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.memory.UnsafeMemoryManager;
-import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.store.api.conf.StoreConf;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.log4j.BasicConfigurator;
-import org.apache.log4j.PropertyConfigurator;
-
-@InterfaceAudience.Internal
-public class StoreUtil {
-
-  private static LogService LOGGER = LogServiceFactory.getLogService(StoreUtil.class.getName());
-
-  public static void loadProperties(String filePath, StoreConf conf) {
-    InputStream input = null;
-    try {
-      input = new FileInputStream(filePath);
-      Properties prop = new Properties();
-      prop.load(input);
-      for (Map.Entry<Object, Object> entry : prop.entrySet()) {
-        conf.conf(entry.getKey().toString(), entry.getValue().toString());
-      }
-      LOGGER.audit("loaded properties: " + filePath);
-    } catch (IOException ex) {
-      LOGGER.error(ex, "Failed to load properties file " + filePath);
-    } finally {
-      if (input != null) {
-        try {
-          input.close();
-        } catch (IOException e) {
-          LOGGER.error(e);
-        }
-      }
-    }
-  }
-
-  public static void initLog4j(String propertiesFilePath) {
-    BasicConfigurator.configure();
-    PropertyConfigurator.configure(propertiesFilePath);
-  }
-
-  public static byte[] serialize(Object object) {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
-    try {
-      ObjectOutputStream oos = new ObjectOutputStream(baos);
-      oos.writeObject(object);
-    } catch (IOException e) {
-      LOGGER.error(e);
-    }
-    return baos.toByteArray();
-  }
-
-  public static Object deserialize(byte[] bytes) {
-    if (bytes == null) {
-      return null;
-    }
-    try {
-      ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
-      return ois.readObject();
-    } catch (IOException e) {
-      LOGGER.error(e);
-    } catch (ClassNotFoundException e) {
-      LOGGER.error(e);
-    }
-    return null;
-  }
-
-  public static void configureCSVInputFormat(Configuration configuration,
-      CarbonLoadModel carbonLoadModel) {
-    CSVInputFormat.setCommentCharacter(configuration, carbonLoadModel.getCommentChar());
-    CSVInputFormat.setCSVDelimiter(configuration, carbonLoadModel.getCsvDelimiter());
-    CSVInputFormat.setSkipEmptyLine(configuration, carbonLoadModel.getSkipEmptyLine());
-    CSVInputFormat.setEscapeCharacter(configuration, carbonLoadModel.getEscapeChar());
-    CSVInputFormat.setMaxColumns(configuration, carbonLoadModel.getMaxColumns());
-    CSVInputFormat.setNumberOfColumns(configuration,
-        "" + carbonLoadModel.getCsvHeaderColumns().length);
-
-    CSVInputFormat.setHeaderExtractionEnabled(
-        configuration,
-        carbonLoadModel.getCsvHeader() == null ||
-            StringUtils.isEmpty(carbonLoadModel.getCsvHeader()));
-
-    CSVInputFormat.setQuoteCharacter(configuration, carbonLoadModel.getQuoteChar());
-
-    CSVInputFormat.setReadBufferSize(
-        configuration,
-        CarbonProperties.getInstance().getProperty(
-            CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
-            CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT));
-  }
-
-  public static void clearUnsafeMemory(long taskId) {
-    UnsafeMemoryManager.INSTANCE.freeMemoryAll(taskId);
-    UnsafeSortMemoryManager.INSTANCE.freeMemoryAll(taskId);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/test/java/org/apache/carbondata/store/DistributedCarbonStoreTest.java
----------------------------------------------------------------------
diff --git a/store/core/src/test/java/org/apache/carbondata/store/DistributedCarbonStoreTest.java b/store/core/src/test/java/org/apache/carbondata/store/DistributedCarbonStoreTest.java
index 2448660..7f80a33 100644
--- a/store/core/src/test/java/org/apache/carbondata/store/DistributedCarbonStoreTest.java
+++ b/store/core/src/test/java/org/apache/carbondata/store/DistributedCarbonStoreTest.java
@@ -27,14 +27,15 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
-import org.apache.carbondata.store.api.CarbonStore;
-import org.apache.carbondata.store.api.CarbonStoreFactory;
-import org.apache.carbondata.store.api.conf.StoreConf;
-import org.apache.carbondata.store.api.descriptor.LoadDescriptor;
-import org.apache.carbondata.store.api.descriptor.SelectDescriptor;
-import org.apache.carbondata.store.api.descriptor.TableDescriptor;
-import org.apache.carbondata.store.api.descriptor.TableIdentifier;
-import org.apache.carbondata.store.api.exception.StoreException;
+import org.apache.carbondata.sdk.store.CarbonStore;
+import org.apache.carbondata.sdk.store.CarbonStoreFactory;
+import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+import org.apache.carbondata.sdk.store.conf.StoreConf;
+import org.apache.carbondata.store.impl.master.Master;
 import org.apache.carbondata.store.impl.worker.Worker;
 
 import org.junit.After;
@@ -50,24 +51,36 @@ public class DistributedCarbonStoreTest {
   private static CarbonStore store;
 
   @BeforeClass
-  public static void beforeAll() throws IOException, StoreException {
+  public static void beforeAll() throws IOException, CarbonException, InterruptedException {
     projectFolder = new File(DistributedCarbonStoreTest.class.getResource("/").getPath() +
         "../../../../").getCanonicalPath();
+
     String confFile = projectFolder + "/store/conf/store.conf";
     StoreConf storeConf = new StoreConf(confFile);
 
-    store = CarbonStoreFactory.getDistributedStore("DistributedCarbonStoreTest", storeConf);
-    projectFolder = new File(LocalCarbonStoreTest.class.getResource("/").getPath() + "../../../../")
-        .getCanonicalPath();
+    new Thread(() -> {
+      try {
+        Master.main(new String[]{"", confFile});
+      } catch (InterruptedException | IOException e) {
+        throw new RuntimeException("failed to start master");
+      }
+    }).start();
+    Thread.sleep(1000);
 
     // start worker
     Worker worker = new Worker(storeConf);
     worker.start();
+
+    Thread.sleep(1000);
+
+    store = CarbonStoreFactory.getDistributedStore("DistributedCarbonStoreTest", storeConf);
   }
 
   @AfterClass
   public static void afterAll() throws IOException {
-    store.close();
+    if (store != null) {
+      store.close();
+    }
   }
 
   @Before
@@ -81,13 +94,13 @@ public class DistributedCarbonStoreTest {
   }
 
   @Test
-  public void testSelect() throws IOException, StoreException {
+  public void testSelect() throws CarbonException {
     TableIdentifier tableIdentifier = new TableIdentifier("table_1", "default");
     store.dropTable(tableIdentifier);
-    TableDescriptor table = TableDescriptor
+    TableDescriptor descriptor = TableDescriptor
         .builder()
-        .ifNotExists()
         .table(tableIdentifier)
+        .ifNotExists()
         .comment("first table")
         .column("shortField", DataTypes.SHORT, "short field")
         .column("intField", DataTypes.INT, "int field")
@@ -101,7 +114,7 @@ public class DistributedCarbonStoreTest {
         .column("floatField", DataTypes.DOUBLE, "float field")
         .tblProperties(CarbonCommonConstants.SORT_COLUMNS, "intField")
         .create();
-    store.createTable(table);
+    store.createTable(descriptor);
 
     // load one segment
     LoadDescriptor load = LoadDescriptor
@@ -114,26 +127,26 @@ public class DistributedCarbonStoreTest {
     store.loadData(load);
 
     // select row
-    SelectDescriptor select = SelectDescriptor
+    ScanDescriptor select = ScanDescriptor
         .builder()
         .table(tableIdentifier)
-        .select("intField", "stringField")
+        .select(new String[]{"intField", "stringField"})
         .limit(5)
         .create();
-    List<CarbonRow> result = store.select(select);
+    List<CarbonRow> result = store.scan(select);
     Assert.assertEquals(5, result.size());
 
     // select row with filter
-    SelectDescriptor select2 = SelectDescriptor
+    ScanDescriptor select2 = ScanDescriptor
         .builder()
         .table(tableIdentifier)
-        .select("intField", "stringField")
+        .select(new String[]{"intField", "stringField"})
         .filter(new EqualToExpression(
             new ColumnExpression("intField", DataTypes.INT),
             new LiteralExpression(11, DataTypes.INT)))
         .limit(5)
         .create();
-    List<CarbonRow> result2 = store.select(select2);
+    List<CarbonRow> result2 = store.scan(select2);
     Assert.assertEquals(1, result2.size());
 
     store.dropTable(tableIdentifier);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
----------------------------------------------------------------------
diff --git a/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java b/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
index 420c8cf..8cf7e14 100644
--- a/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
+++ b/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
@@ -22,20 +22,19 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
-import org.apache.carbondata.store.api.CarbonStore;
-import org.apache.carbondata.store.api.CarbonStoreFactory;
-import org.apache.carbondata.store.api.conf.StoreConf;
-import org.apache.carbondata.store.api.descriptor.LoadDescriptor;
-import org.apache.carbondata.store.api.descriptor.SelectDescriptor;
-import org.apache.carbondata.store.api.descriptor.TableDescriptor;
-import org.apache.carbondata.store.api.descriptor.TableIdentifier;
-import org.apache.carbondata.store.api.exception.StoreException;
+import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+import org.apache.carbondata.sdk.store.CarbonStore;
+import org.apache.carbondata.sdk.store.CarbonStoreFactory;
+import org.apache.carbondata.sdk.store.conf.StoreConf;
 
 import org.junit.After;
 import org.junit.AfterClass;
@@ -50,7 +49,7 @@ public class LocalCarbonStoreTest {
   private static CarbonStore store;
 
   @BeforeClass
-  public static void setup() throws IOException, StoreException {
+  public static void setup() throws IOException, CarbonException {
     StoreConf conf = new StoreConf("test", "./");
     conf.conf(StoreConf.STORE_TEMP_LOCATION, "./temp");
     store = CarbonStoreFactory.getLocalStore("LocalCarbonStoreTest", conf);
@@ -74,13 +73,13 @@ public class LocalCarbonStoreTest {
   }
 
   @Test
-  public void testWriteAndReadFiles() throws IOException, StoreException {
+  public void testWriteAndReadFiles() throws IOException, CarbonException {
     TableIdentifier tableIdentifier = new TableIdentifier("table_1", "default");
     store.dropTable(tableIdentifier);
-    TableDescriptor table = TableDescriptor
+    TableDescriptor descriptor = TableDescriptor
         .builder()
-        .ifNotExists()
         .table(tableIdentifier)
+        .ifNotExists()
         .comment("first table")
         .column("shortField", DataTypes.SHORT, "short field")
         .column("intField", DataTypes.INT, "int field")
@@ -94,7 +93,7 @@ public class LocalCarbonStoreTest {
         .column("floatField", DataTypes.DOUBLE, "float field")
         .tblProperties(CarbonCommonConstants.SORT_COLUMNS, "intField")
         .create();
-    store.createTable(table);
+    store.createTable(descriptor);
 
     // load one segment
     LoadDescriptor load = LoadDescriptor
@@ -107,26 +106,26 @@ public class LocalCarbonStoreTest {
     store.loadData(load);
 
     // select row
-    SelectDescriptor select = SelectDescriptor
+    ScanDescriptor select = ScanDescriptor
         .builder()
         .table(tableIdentifier)
-        .select("intField", "stringField")
+        .select(new String[]{"intField", "stringField"})
         .limit(5)
         .create();
-    List<CarbonRow> result = store.select(select);
+    List<CarbonRow> result = store.scan(select);
     Assert.assertEquals(5, result.size());
 
     // select row with filter
-    SelectDescriptor select2 = SelectDescriptor
+    ScanDescriptor select2 = ScanDescriptor
         .builder()
         .table(tableIdentifier)
-        .select("intField", "stringField")
+        .select(new String[]{"intField", "stringField"})
         .filter(new EqualToExpression(
             new ColumnExpression("intField", DataTypes.INT),
             new LiteralExpression(11, DataTypes.INT)))
         .limit(5)
         .create();
-    List<CarbonRow> result2 = store.select(select2);
+    List<CarbonRow> result2 = store.scan(select2);
     Assert.assertEquals(1, result2.size());
 
     store.dropTable(tableIdentifier);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java
----------------------------------------------------------------------
diff --git a/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java b/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java
index f73591c..dffc8a7 100644
--- a/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java
+++ b/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java
@@ -18,21 +18,11 @@
 package org.apache.carbondata.store;
 
 import java.io.File;
-import java.io.FileFilter;
 import java.io.IOException;
 
-import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.sdk.file.CarbonWriter;
-import org.apache.carbondata.sdk.file.CarbonWriterBuilder;
-import org.apache.carbondata.sdk.file.Schema;
-import org.apache.carbondata.store.api.conf.StoreConf;
-import org.apache.carbondata.store.util.StoreUtil;
-
-import org.junit.Assert;
 
 public class TestUtil {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/HorizonClient.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/HorizonClient.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/HorizonClient.java
index eaa4583..e1b039e 100644
--- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/HorizonClient.java
+++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/HorizonClient.java
@@ -28,7 +28,7 @@ import org.apache.carbondata.horizon.rest.model.view.CreateTableRequest;
 import org.apache.carbondata.horizon.rest.model.view.DropTableRequest;
 import org.apache.carbondata.horizon.rest.model.view.LoadRequest;
 import org.apache.carbondata.horizon.rest.model.view.SelectRequest;
-import org.apache.carbondata.store.api.exception.StoreException;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
 
 /**
  * Client to send REST request to Horizon service
@@ -40,9 +40,9 @@ public interface HorizonClient extends Closeable {
   /**
    * Create a Table
    * @param create descriptor for create table operation
-   * @throws IOException if network or disk IO error occurs
+   * @throws CarbonException if network or disk IO error occurs
    */
-  void createTable(CreateTableRequest create) throws IOException, StoreException;
+  void createTable(CreateTableRequest create) throws CarbonException;
 
   /**
    * Drop a Table, and remove all data in it
@@ -56,7 +56,7 @@ public interface HorizonClient extends Closeable {
    * @param load descriptor for load operation
    * @throws IOException if network or disk IO error occurs
    */
-  void loadData(LoadRequest load) throws IOException, StoreException;
+  void loadData(LoadRequest load) throws IOException, CarbonException;
 
   /**
    * Scan a Table and return matched rows
@@ -64,7 +64,7 @@ public interface HorizonClient extends Closeable {
    * @return matched rows
    * @throws IOException if network or disk IO error occurs
    */
-  List<CarbonRow> select(SelectRequest select) throws IOException, StoreException;
+  List<CarbonRow> select(SelectRequest select) throws IOException, CarbonException;
 
   /**
    * Executor a SQL statement

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java
index 076df70..b24c8d2 100644
--- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java
+++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java
@@ -31,7 +31,7 @@ import org.apache.carbondata.horizon.rest.model.view.DropTableRequest;
 import org.apache.carbondata.horizon.rest.model.view.LoadRequest;
 import org.apache.carbondata.horizon.rest.model.view.SelectRequest;
 import org.apache.carbondata.horizon.rest.model.view.SelectResponse;
-import org.apache.carbondata.store.api.exception.StoreException;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
 
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.client.RestTemplate;
@@ -52,7 +52,7 @@ public class SimpleHorizonClient implements HorizonClient {
   }
 
   @Override
-  public void createTable(CreateTableRequest create) throws IOException, StoreException {
+  public void createTable(CreateTableRequest create) throws CarbonException {
     Objects.requireNonNull(create);
     restTemplate.postForEntity(serviceUri + "/table/create", create, String.class);
   }
@@ -64,18 +64,18 @@ public class SimpleHorizonClient implements HorizonClient {
   }
 
   @Override
-  public void loadData(LoadRequest load) throws IOException, StoreException {
+  public void loadData(LoadRequest load) throws IOException, CarbonException {
     Objects.requireNonNull(load);
     restTemplate.postForEntity(serviceUri + "/table/load", load, String.class);
   }
 
   @Override
-  public List<CarbonRow> select(SelectRequest select) throws IOException, StoreException {
+  public List<CarbonRow> select(SelectRequest select) throws IOException, CarbonException {
     Objects.requireNonNull(select);
     ResponseEntity<SelectResponse> response =
         restTemplate.postForEntity(serviceUri + "/table/select", select, SelectResponse.class);
-    Object[][] rows = Objects.requireNonNull(response.getBody()).getRows();
-    List<CarbonRow> output = new ArrayList<>(rows.length);
+    List<Object[]> rows = Objects.requireNonNull(response.getBody()).getRows();
+    List<CarbonRow> output = new ArrayList<>(rows.size());
     for (Object[] row : rows) {
       output.add(new CarbonRow(row));
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java
index a30b587..cffca07 100644
--- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java
+++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java
@@ -17,7 +17,8 @@
 
 package org.apache.carbondata.horizon.rest.controller;
 
-import org.apache.carbondata.store.api.conf.StoreConf;
+
+import org.apache.carbondata.sdk.store.conf.StoreConf;
 
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java
index a273f54..d3d1bd7 100644
--- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java
+++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java
@@ -18,7 +18,8 @@ package org.apache.carbondata.horizon.rest.controller;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 
@@ -36,14 +37,17 @@ import org.apache.carbondata.horizon.rest.model.view.DropTableRequest;
 import org.apache.carbondata.horizon.rest.model.view.LoadRequest;
 import org.apache.carbondata.horizon.rest.model.view.SelectRequest;
 import org.apache.carbondata.horizon.rest.model.view.SelectResponse;
-import org.apache.carbondata.store.api.CarbonStore;
-import org.apache.carbondata.store.api.CarbonStoreFactory;
-import org.apache.carbondata.store.api.conf.StoreConf;
-import org.apache.carbondata.store.api.descriptor.LoadDescriptor;
-import org.apache.carbondata.store.api.descriptor.SelectDescriptor;
-import org.apache.carbondata.store.api.descriptor.TableDescriptor;
-import org.apache.carbondata.store.api.descriptor.TableIdentifier;
-import org.apache.carbondata.store.api.exception.StoreException;
+import org.apache.carbondata.sdk.store.conf.StoreConf;
+import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+import org.apache.carbondata.store.devapi.InternalCarbonStore;
+import org.apache.carbondata.store.devapi.InternalCarbonStoreFactory;
+import org.apache.carbondata.store.devapi.ResultBatch;
+import org.apache.carbondata.store.devapi.ScanUnit;
+import org.apache.carbondata.store.devapi.Scanner;
 
 import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
@@ -59,15 +63,15 @@ public class HorizonController {
   private static LogService LOGGER =
       LogServiceFactory.getLogService(HorizonController.class.getName());
 
-  private CarbonStore store;
+  private InternalCarbonStore store;
 
-  public HorizonController() throws StoreException {
+  public HorizonController() throws CarbonException {
     String storeFile = System.getProperty("carbonstore.conf.file");
     StoreConf storeConf = new StoreConf();
     try {
       storeConf.conf(StoreConf.STORE_LOCATION, CarbonProperties.getStorePath())
           .conf(StoreConf.MASTER_HOST, InetAddress.getLocalHost().getHostAddress())
-          .conf(StoreConf.MASTER_PORT, CarbonProperties.getSearchMasterPort())
+          .conf(StoreConf.STORE_PORT, CarbonProperties.getSearchMasterPort())
           .conf(StoreConf.WORKER_HOST, InetAddress.getLocalHost().getHostAddress())
           .conf(StoreConf.WORKER_PORT, CarbonProperties.getSearchWorkerPort())
           .conf(StoreConf.WORKER_CORE_NUM, 2);
@@ -76,13 +80,10 @@ public class HorizonController {
         storeConf.load(storeFile);
       }
 
-    } catch (UnknownHostException e) {
-      throw new StoreException(e);
+      store = InternalCarbonStoreFactory.getStore(storeConf);
     } catch (IOException e) {
-      throw new StoreException(e);
+      throw new CarbonException(e);
     }
-
-    store = CarbonStoreFactory.getDistributedStore("GlobalStore", storeConf);
   }
 
   @RequestMapping(value = "echo")
@@ -92,7 +93,7 @@ public class HorizonController {
 
   @RequestMapping(value = "/table/create", produces = MediaType.APPLICATION_JSON_VALUE)
   public ResponseEntity<String> createTable(
-      @RequestBody CreateTableRequest request) throws StoreException, IOException {
+      @RequestBody CreateTableRequest request) throws CarbonException {
     RequestValidator.validateTable(request);
     TableDescriptor tableDescriptor = request.convertToDto();
     store.createTable(tableDescriptor);
@@ -101,7 +102,7 @@ public class HorizonController {
 
   @RequestMapping(value = "/table/drop", produces = MediaType.APPLICATION_JSON_VALUE)
   public ResponseEntity<String> dropTable(
-      @RequestBody DropTableRequest request) throws StoreException, IOException {
+      @RequestBody DropTableRequest request) throws CarbonException {
     RequestValidator.validateDrop(request);
     store.dropTable(new TableIdentifier(request.getTableName(), request.getDatabaseName()));
     return new ResponseEntity<>(String.valueOf(true), HttpStatus.OK);
@@ -109,7 +110,7 @@ public class HorizonController {
 
   @RequestMapping(value = "/table/load", produces = MediaType.APPLICATION_JSON_VALUE)
   public ResponseEntity<String> load(@RequestBody LoadRequest request)
-      throws StoreException, IOException {
+      throws CarbonException, IOException {
     RequestValidator.validateLoad(request);
     LoadDescriptor loadDescriptor = request.convertToDto();
     store.loadData(loadDescriptor);
@@ -118,22 +119,29 @@ public class HorizonController {
 
   @RequestMapping(value = "/table/select", produces = MediaType.APPLICATION_JSON_VALUE)
   public ResponseEntity<SelectResponse> select(@RequestBody SelectRequest request)
-      throws StoreException, IOException {
+      throws CarbonException {
     long start = System.currentTimeMillis();
     RequestValidator.validateSelect(request);
     TableIdentifier table = new TableIdentifier(request.getTableName(), request.getDatabaseName());
-    CarbonTable carbonTable = store.getTable(table);
+    CarbonTable carbonTable = store.getCarbonTable(table);
     Expression expression = Parser.parseFilter(request.getFilter(), carbonTable);
-    SelectDescriptor selectDescriptor = new SelectDescriptor(
+    Scanner<CarbonRow> scanner = store.newScanner(table);
+    List<ScanUnit> scanUnits = scanner.prune(table, expression);
+    ScanDescriptor scanDescriptor = new ScanDescriptor(
         table, request.getSelect(), expression, request.getLimit());
-    List<CarbonRow> result = store.select(selectDescriptor);
-    Iterator<CarbonRow> iterator = result.iterator();
-    Object[][] output = new Object[result.size()][];
-    int i = 0;
-    while (iterator.hasNext()) {
-      output[i] = (iterator.next().getData());
-      i++;
+    ArrayList<Object[]> output = new ArrayList<>();
+    for (ScanUnit scanUnit : scanUnits) {
+      Iterator<? extends ResultBatch<CarbonRow>> iterator = scanner.scan(
+          scanUnit, scanDescriptor, new HashMap<String, String>());
+
+      while (iterator.hasNext()) {
+        ResultBatch<CarbonRow> rows = iterator.next();
+        while (rows.hasNext()) {
+          output.add(rows.next().getData());
+        }
+      }
     }
+
     long end = System.currentTimeMillis();
     LOGGER.audit("[" + request.getRequestId() + "] HorizonController select " +
         request.getDatabaseName() + "." + request.getTableName() +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java
index fbba57b..15a30cb 100644
--- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java
+++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java
@@ -21,63 +21,63 @@ import org.apache.carbondata.horizon.rest.model.view.CreateTableRequest;
 import org.apache.carbondata.horizon.rest.model.view.DropTableRequest;
 import org.apache.carbondata.horizon.rest.model.view.LoadRequest;
 import org.apache.carbondata.horizon.rest.model.view.SelectRequest;
-import org.apache.carbondata.store.api.exception.StoreException;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
 
 import org.apache.commons.lang.StringUtils;
 
 public class RequestValidator {
 
-  public static void validateSelect(SelectRequest request) throws StoreException {
+  public static void validateSelect(SelectRequest request) throws CarbonException {
     if (request == null) {
-      throw new StoreException("Select should not be null");
+      throw new CarbonException("Select should not be null");
     }
     if (StringUtils.isEmpty(request.getDatabaseName())) {
-      throw new StoreException("database name is invalid");
+      throw new CarbonException("database name is invalid");
     }
     if (StringUtils.isEmpty(request.getTableName())) {
-      throw new StoreException("table name is invalid");
+      throw new CarbonException("table name is invalid");
     }
   }
 
-  public static void validateTable(CreateTableRequest request) throws StoreException {
+  public static void validateTable(CreateTableRequest request) throws CarbonException {
     if (request == null) {
-      throw new StoreException("TableDescriptor should not be null");
+      throw new CarbonException("TableDescriptor should not be null");
     }
     if (StringUtils.isEmpty(request.getDatabaseName())) {
-      throw new StoreException("database name is invalid");
+      throw new CarbonException("database name is invalid");
     }
     if (StringUtils.isEmpty(request.getTableName())) {
-      throw new StoreException("table name is invalid");
+      throw new CarbonException("table name is invalid");
     }
     if (request.getFields() == null || request.getFields().length == 0) {
-      throw new StoreException("fields should not be empty");
+      throw new CarbonException("fields should not be empty");
     }
   }
 
-  public static void validateLoad(LoadRequest request)  throws StoreException {
+  public static void validateLoad(LoadRequest request)  throws CarbonException {
     if (request == null) {
-      throw new StoreException("LoadDescriptor should not be null");
+      throw new CarbonException("LoadDescriptor should not be null");
     }
     if (StringUtils.isEmpty(request.getDatabaseName())) {
-      throw new StoreException("database name is invalid");
+      throw new CarbonException("database name is invalid");
     }
     if (StringUtils.isEmpty(request.getTableName())) {
-      throw new StoreException("table name is invalid");
+      throw new CarbonException("table name is invalid");
     }
     if (StringUtils.isEmpty(request.getInputPath())) {
-      throw new StoreException("input path is invalid");
+      throw new CarbonException("input path is invalid");
     }
   }
 
-  public static void validateDrop(DropTableRequest request)  throws StoreException {
+  public static void validateDrop(DropTableRequest request)  throws CarbonException {
     if (request == null) {
-      throw new StoreException("DropTableRequest should not be null");
+      throw new CarbonException("DropTableRequest should not be null");
     }
     if (StringUtils.isEmpty(request.getDatabaseName())) {
-      throw new StoreException("database name is invalid");
+      throw new CarbonException("database name is invalid");
     }
     if (StringUtils.isEmpty(request.getTableName())) {
-      throw new StoreException("table name is invalid");
+      throw new CarbonException("table name is invalid");
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/CreateTableRequest.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/CreateTableRequest.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/CreateTableRequest.java
index cf59f7f..623abee 100644
--- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/CreateTableRequest.java
+++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/CreateTableRequest.java
@@ -24,8 +24,8 @@ import java.util.Map;
 
 import org.apache.carbondata.sdk.file.Field;
 import org.apache.carbondata.sdk.file.Schema;
-import org.apache.carbondata.store.api.descriptor.TableDescriptor;
-import org.apache.carbondata.store.api.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.descriptor.TableDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
 
 public class CreateTableRequest extends Request {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/FieldRequest.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/FieldRequest.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/FieldRequest.java
index b809d9e..200b3a2 100644
--- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/FieldRequest.java
+++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/FieldRequest.java
@@ -108,7 +108,7 @@ public class FieldRequest {
     field.setPrecision(precision);
     field.setScale(scale);
     field.setColumnComment(comment);
-    field.setChildren(new LinkedList<StructField>());
+    field.setChildren(new LinkedList<Field>());
     return field;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/LoadRequest.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/LoadRequest.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/LoadRequest.java
index c91f5f5..dfe21f6 100644
--- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/LoadRequest.java
+++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/LoadRequest.java
@@ -20,8 +20,8 @@ package org.apache.carbondata.horizon.rest.model.view;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.carbondata.store.api.descriptor.LoadDescriptor;
-import org.apache.carbondata.store.api.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
 
 public class LoadRequest extends Request {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java
index 6bf5c75..e4f8cf8 100644
--- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java
+++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java
@@ -17,23 +17,22 @@
 
 package org.apache.carbondata.horizon.rest.model.view;
 
+import java.util.List;
+
 public class SelectResponse extends Response {
 
-  private Object[][] rows;
+  private List<Object[]> rows;
 
   public SelectResponse() {
   }
 
-  public SelectResponse(SelectRequest request, String message, Object[][] rows) {
+  public SelectResponse(SelectRequest request, String message, List<Object[]> rows) {
     super(request, message);
     this.rows = rows;
   }
 
-  public Object[][] getRows() {
+  public List<Object[]> getRows() {
     return rows;
   }
 
-  public void setRows(Object[][] rows) {
-    this.rows = rows;
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java b/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java
index 91a9dba..59c4bd1 100644
--- a/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java
+++ b/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java
@@ -32,9 +32,9 @@ import org.apache.carbondata.horizon.rest.model.view.LoadRequest;
 import org.apache.carbondata.horizon.rest.model.view.SelectRequest;
 import org.apache.carbondata.horizon.rest.model.view.SelectResponse;
 import org.apache.carbondata.store.api.conf.StoreConf;
-import org.apache.carbondata.store.api.exception.StoreException;
+import org.apache.carbondata.store.api.exception.CarbonException;
 import org.apache.carbondata.store.impl.worker.Worker;
-import org.apache.carbondata.store.util.StoreUtil;
+import org.apache.carbondata.sdk.store.util.StoreUtil;
 
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -109,13 +109,13 @@ public class HorizonTest {
     SelectRequest select = createSelectRequest(5, null, "intField", "stringField");
     SelectResponse result =
         restTemplate.postForObject(serviceUri + "/table/select", select, SelectResponse.class);
-    Assert.assertEquals(5, result.getRows().length);
+    Assert.assertEquals(5, result.getRows().size());
 
     // select row with filter
     SelectRequest filter = createSelectRequest(5, "intField = 11", "intField", "stringField");
     SelectResponse filterResult =
         restTemplate.postForObject(serviceUri + "/table/select", filter, SelectResponse.class);
-    Assert.assertEquals(1, filterResult.getRows().length);
+    Assert.assertEquals(1, filterResult.getRows().size());
 
     request = createDropTableRequest();
     response = restTemplate.postForObject(serviceUri + "/table/drop", request, String.class);
@@ -173,7 +173,7 @@ public class HorizonTest {
   }
 
   @Test
-  public void testHorizonClient() throws IOException, StoreException {
+  public void testHorizonClient() throws IOException, CarbonException {
     HorizonClient client = new SimpleHorizonClient(serviceUri);
     DropTableRequest drop = createDropTableRequest();
     client.dropTable(drop);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/pom.xml
----------------------------------------------------------------------
diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml
index aecf7e2..b16a290 100644
--- a/store/sdk/pom.xml
+++ b/store/sdk/pom.xml
@@ -51,8 +51,8 @@
       <plugin>
         <artifactId>maven-compiler-plugin</artifactId>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.8</source>
+          <target>1.8</target>
         </configuration>
       </plugin>
       <plugin>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index fdd1f5a..771896b 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.logging.LogService;
@@ -30,6 +31,7 @@ import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDi
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.metadata.datatype.StructType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
@@ -239,9 +241,9 @@ public class AvroCarbonWriter extends CarbonWriter {
         return new Field(FieldName, DataTypes.DOUBLE);
       case RECORD:
         // recursively get the sub fields
-        ArrayList<StructField> structSubFields = new ArrayList<>();
+        ArrayList<Field> structSubFields = new ArrayList<>();
         for (Schema.Field avroSubField : childSchema.getFields()) {
-          StructField structField = prepareSubFields(avroSubField.name(), avroSubField.schema());
+          Field structField = prepareSubFields(avroSubField.name(), avroSubField.schema());
           if (structField != null) {
             structSubFields.add(structField);
           }
@@ -249,9 +251,9 @@ public class AvroCarbonWriter extends CarbonWriter {
         return new Field(FieldName, "struct", structSubFields);
       case ARRAY:
         // recursively get the sub fields
-        ArrayList<StructField> arraySubField = new ArrayList<>();
+        ArrayList<Field> arraySubField = new ArrayList<>();
         // array will have only one sub field.
-        StructField structField = prepareSubFields("val", childSchema.getElementType());
+        Field structField = prepareSubFields("val", childSchema.getElementType());
         if (structField != null) {
           arraySubField.add(structField);
           return new Field(FieldName, "array", arraySubField);
@@ -266,51 +268,51 @@ public class AvroCarbonWriter extends CarbonWriter {
     }
   }
 
-  private static StructField prepareSubFields(String FieldName, Schema childSchema) {
+  private static Field prepareSubFields(String FieldName, Schema childSchema) {
     Schema.Type type = childSchema.getType();
     LogicalType logicalType = childSchema.getLogicalType();
     switch (type) {
       case BOOLEAN:
-        return new StructField(FieldName, DataTypes.BOOLEAN);
+        return new Field(FieldName, DataTypes.BOOLEAN);
       case INT:
         if (logicalType instanceof LogicalTypes.Date) {
-          return new StructField(FieldName, DataTypes.DATE);
+          return new Field(FieldName, DataTypes.DATE);
         } else {
           LOGGER.warn("Unsupported logical type. Considering Data Type as INT for " + childSchema
               .getName());
-          return new StructField(FieldName, DataTypes.INT);
+          return new Field(FieldName, DataTypes.INT);
         }
       case LONG:
         if (logicalType instanceof LogicalTypes.TimestampMillis
             || logicalType instanceof LogicalTypes.TimestampMicros) {
-          return new StructField(FieldName, DataTypes.TIMESTAMP);
+          return new Field(FieldName, DataTypes.TIMESTAMP);
         } else {
           LOGGER.warn("Unsupported logical type. Considering Data Type as LONG for " + childSchema
               .getName());
-          return new StructField(FieldName, DataTypes.LONG);
+          return new Field(FieldName, DataTypes.LONG);
         }
       case DOUBLE:
-        return new StructField(FieldName, DataTypes.DOUBLE);
+        return new Field(FieldName, DataTypes.DOUBLE);
       case STRING:
-        return new StructField(FieldName, DataTypes.STRING);
+        return new Field(FieldName, DataTypes.STRING);
       case FLOAT:
-        return new StructField(FieldName, DataTypes.DOUBLE);
+        return new Field(FieldName, DataTypes.DOUBLE);
       case RECORD:
         // recursively get the sub fields
-        ArrayList<StructField> structSubFields = new ArrayList<>();
+        ArrayList<Field> structSubFields = new ArrayList<>();
         for (Schema.Field avroSubField : childSchema.getFields()) {
-          StructField structField = prepareSubFields(avroSubField.name(), avroSubField.schema());
+          Field structField = prepareSubFields(avroSubField.name(), avroSubField.schema());
           if (structField != null) {
             structSubFields.add(structField);
           }
         }
-        return (new StructField(FieldName, DataTypes.createStructType(structSubFields)));
+        return (new Field(FieldName, createStructType(structSubFields)));
       case ARRAY:
         // recursively get the sub fields
         // array will have only one sub field.
         DataType subType = getMappingDataTypeForArrayRecord(childSchema.getElementType());
         if (subType != null) {
-          return (new StructField(FieldName, DataTypes.createArrayType(subType)));
+          return (new Field(FieldName, DataTypes.createArrayType(subType)));
         } else {
           return null;
         }
@@ -322,6 +324,14 @@ public class AvroCarbonWriter extends CarbonWriter {
     }
   }
 
+  private static StructType createStructType(List<Field> fields) {
+    List<StructField> f = fields.stream().map(field ->
+        new StructField(field.getFieldName(), field.getDataType(),
+            createStructType(field.getChildren()).getFields())
+    ).collect(Collectors.toList());
+    return DataTypes.createStructType(f);
+  }
+
   private static DataType getMappingDataTypeForArrayRecord(Schema childSchema) {
     LogicalType logicalType = childSchema.getLogicalType();
     switch (childSchema.getType()) {
@@ -360,14 +370,14 @@ public class AvroCarbonWriter extends CarbonWriter {
         return DataTypes.DOUBLE;
       case RECORD:
         // recursively get the sub fields
-        ArrayList<StructField> structSubFields = new ArrayList<>();
+        ArrayList<Field> structSubFields = new ArrayList<>();
         for (Schema.Field avroSubField : childSchema.getFields()) {
-          StructField structField = prepareSubFields(avroSubField.name(), avroSubField.schema());
+          Field structField = prepareSubFields(avroSubField.name(), avroSubField.schema());
           if (structField != null) {
             structSubFields.add(structField);
           }
         }
-        return DataTypes.createStructType(structSubFields);
+        return createStructType(structSubFields);
       case ARRAY:
         // array will have only one sub field.
         DataType subType = getMappingDataTypeForArrayRecord(childSchema.getElementType());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index a9d725f..92ed0d8 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -513,8 +513,8 @@ public class CarbonWriterBuilder {
           } else if (field.getDataType().getName().equalsIgnoreCase("STRUCT")) {
             // Loop through the inner columns and for a StructData
             List<StructField> structFieldsArray =
-                new ArrayList<StructField>(field.getChildren().size());
-            for (StructField childFld : field.getChildren()) {
+                new ArrayList<>(field.getChildren().size());
+            for (Field childFld : field.getChildren()) {
               structFieldsArray
                   .add(new StructField(childFld.getFieldName(), childFld.getDataType()));
             }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
index add10c1..0d70c3b 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
@@ -17,13 +17,13 @@
 
 package org.apache.carbondata.sdk.file;
 
+import java.io.Serializable;
 import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
 import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.StructField;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
@@ -32,11 +32,11 @@ import org.apache.carbondata.core.util.DataTypeUtil;
  */
 @InterfaceAudience.User
 @InterfaceStability.Unstable
-public class Field {
+public class Field implements Serializable {
 
   private String name;
   private DataType type;
-  private List<StructField> children;
+  private List<Field> children;
   private String parent;
   private String storeType = "columnnar";
   private int schemaOrdinal = -1;
@@ -54,11 +54,11 @@ public class Field {
     this(name, DataTypeUtil.valueOf(type));
   }
 
-  public Field(String name, String type, List<StructField> fields) {
+  public Field(String name, String type, List<Field> fields) {
     this(name, DataTypeUtil.valueOf(type), fields);
   }
 
-  public Field(String name, DataType type, List<StructField> fields) {
+  public Field(String name, DataType type, List<Field> fields) {
     this.name = name;
     this.type = type;
     this.children = fields;
@@ -91,11 +91,11 @@ public class Field {
     return type;
   }
 
-  public List<StructField> getChildren() {
+  public List<Field> getChildren() {
     return children;
   }
 
-  public void setChildren(List<StructField> children) {
+  public void setChildren(List<Field> children) {
     this.children = children;
   }
 
@@ -150,4 +150,5 @@ public class Field {
   public void setColumnComment(String columnComment) {
     this.columnComment = columnComment;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java
index c9622e1..075ae71 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.sdk.file;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -44,7 +45,7 @@ import org.apache.commons.lang.StringUtils;
  */
 @InterfaceAudience.User
 @InterfaceStability.Unstable
-public class Schema {
+public class Schema implements Serializable {
 
   private Field[] fields;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/CarbonStore.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/CarbonStore.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/CarbonStore.java
new file mode 100644
index 0000000..0472b75
--- /dev/null
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/CarbonStore.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.store;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.StructType;
+import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+
+/**
+ * Public Interface of CarbonStore
+ */
+@InterfaceAudience.User
+@InterfaceStability.Unstable
+public interface CarbonStore extends Closeable {
+
+  ////////////////////////////////////////////////////////////////////
+  /////                  Metadata Operation                      /////
+  ////////////////////////////////////////////////////////////////////
+
+  /**
+   * Create a Table
+   * @param descriptor descriptor for create table operation
+   * @throws CarbonException if any error occurs
+   */
+  void createTable(TableDescriptor descriptor) throws CarbonException;
+
+  /**
+   * Drop a Table, and remove all data in it
+   * @param table table identifier
+   * @throws CarbonException if any error occurs
+   */
+  void dropTable(TableIdentifier table) throws CarbonException;
+
+  /**
+   * @return all table created
+   * @throws CarbonException if any error occurs
+   */
+  List<TableDescriptor> listTable() throws CarbonException;
+
+  /**
+   * Return table descriptor by specified identifier
+   * @param table table identifier
+   * @return table descriptor
+   * @throws CarbonException if any error occurs
+   */
+  TableDescriptor getDescriptor(TableIdentifier table) throws CarbonException;
+
+  /**
+   * Alter table operation
+   * @param table table identifier
+   * @param newTable new table descriptor to alter to
+   * @throws CarbonException if any error occurs
+   */
+  void alterTable(TableIdentifier table, TableDescriptor newTable) throws CarbonException;
+
+
+  ////////////////////////////////////////////////////////////////////
+  /////                     Write Operation                      /////
+  ////////////////////////////////////////////////////////////////////
+
+  /**
+   * Trigger a Load into the table specified by load descriptor
+   * @param load descriptor for load operation
+   * @throws CarbonException if any error occurs
+   */
+  void loadData(LoadDescriptor load) throws CarbonException;
+
+  /**
+   * Return true if this table has primary key defined when create table using
+   * {@link #createTable(TableDescriptor)}
+   *
+   * For such table, upsert, delete and lookup is supported
+   *
+   * @return true if this table has primary key.
+   */
+  default boolean isPrimaryKeyDefined(TableIdentifier identifier) {
+    return false;
+  }
+
+  /**
+   * Insert a batch of rows if key is not exist, otherwise update the row
+   * @param row rows to be upsert
+   * @param schema schema of the input row (fields without the primary key)
+   * @throws CarbonException if any error occurs
+   */
+  void upsert(Iterator<KeyedRow> row, StructType schema) throws CarbonException;
+
+  /**
+   * Delete a batch of rows
+   * @param keys keys to be deleted
+   * @throws CarbonException if any error occurs
+   */
+  void delete(Iterator<PrimaryKey> keys) throws CarbonException;
+
+
+  ////////////////////////////////////////////////////////////////////
+  /////                      Read Operation                      /////
+  ////////////////////////////////////////////////////////////////////
+
+  /**
+   * Scan the specified table and return matched rows
+   *
+   * @param select descriptor for scan operation
+   * @return matched rows
+   * @throws CarbonException if any error occurs
+   */
+  List<CarbonRow> scan(ScanDescriptor select) throws CarbonException;
+
+  /**
+   * Lookup and return a row with specified primary key
+   * @param key key to lookup
+   * @return matched row for the specified key
+   * @throws CarbonException if any error occurs
+   */
+  Row lookup(PrimaryKey key) throws CarbonException;
+
+  /**
+   * Lookup by filter expression and return a list of matched row
+   *
+   * @param tableIdentifier table identifier
+   * @param filterExpression filter expression, like "col3 = 1"
+   * @return matched row for the specified filter
+   * @throws CarbonException if any error occurs
+   */
+  List<Row> lookup(TableIdentifier tableIdentifier, String filterExpression) throws CarbonException;
+}