You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/08/06 11:04:51 UTC

[4/5] carbondata git commit: [CARBONDATA-2825][CARBONDATA-2828] CarbonStore and InternalCarbonStore API This closes #2589

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/ScanUnit.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/ScanUnit.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/ScanUnit.java
new file mode 100644
index 0000000..fdcffdf
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/ScanUnit.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.devapi;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+
+/**
+ * An unit for the scanner in Carbon Store
+ */
+@InterfaceAudience.Developer("Integration")
+@InterfaceStability.Unstable
+public interface ScanUnit extends Serializable, Writable {
+
+  /**
+   * Return the list of preferred location of this ScanUnit.
+   * The default return value is empty string array, which means this ScanUnit
+   * has no location preference.
+   */
+  default String[] preferredLocations() {
+    return new String[0];
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/Scanner.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/Scanner.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/Scanner.java
new file mode 100644
index 0000000..e56eee2
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/Scanner.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.devapi;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+/**
+ * A Scanner is used to scan the table in a distributed compute
+ * engine like Apache Spark
+ */
+@InterfaceAudience.Developer("Integration")
+@InterfaceStability.Unstable
+public interface Scanner<T> extends DataScanner<T>, Pruner, Serializable {
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/TransactionalOperation.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/TransactionalOperation.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/TransactionalOperation.java
new file mode 100644
index 0000000..29b3bce
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/TransactionalOperation.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.devapi;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+@InterfaceAudience.Developer("Integration")
+@InterfaceStability.Unstable
+public interface TransactionalOperation {
+  /**
+   * commit the transaction when operation succeed
+   */
+  void commit();
+
+  /**
+   * close the transaction when operation failed
+   */
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/BlockScanUnit.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/BlockScanUnit.java b/store/core/src/main/java/org/apache/carbondata/store/impl/BlockScanUnit.java
new file mode 100644
index 0000000..d7c2194
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/BlockScanUnit.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.store.devapi.ScanUnit;
+
+/**
+ * It contains a block to scan, and a destination worker who should scan it
+ */
+@InterfaceAudience.Internal
+public class BlockScanUnit implements ScanUnit {
+
+  // the data block to scan
+  private CarbonInputSplit inputSplit;
+
+  // the worker who should scan this unit
+  private Schedulable schedulable;
+
+  public BlockScanUnit() {
+  }
+
+  public BlockScanUnit(CarbonInputSplit inputSplit, Schedulable schedulable) {
+    this.inputSplit = inputSplit;
+    this.schedulable = schedulable;
+  }
+
+  public CarbonInputSplit getInputSplit() {
+    return inputSplit;
+  }
+
+  public Schedulable getSchedulable() {
+    return schedulable;
+  }
+
+  @Override
+  public String[] preferredLocations() {
+    return inputSplit.preferredLocations();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    inputSplit.write(out);
+    schedulable.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    inputSplit = new CarbonInputSplit();
+    inputSplit.readFields(in);
+    schedulable = new Schedulable();
+    schedulable.readFields(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/CarbonStoreBase.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/CarbonStoreBase.java b/store/core/src/main/java/org/apache/carbondata/store/impl/CarbonStoreBase.java
deleted file mode 100644
index 7e50102..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/impl/CarbonStoreBase.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.block.Distributable;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.exception.InvalidConfigurationException;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.model.QueryModelBuilder;
-import org.apache.carbondata.core.util.CarbonTaskInfo;
-import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.hadoop.CarbonRecordReader;
-import org.apache.carbondata.hadoop.api.CarbonInputFormat;
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-import org.apache.carbondata.store.api.CarbonStore;
-import org.apache.carbondata.store.api.conf.StoreConf;
-import org.apache.carbondata.store.api.descriptor.TableDescriptor;
-import org.apache.carbondata.store.api.descriptor.TableIdentifier;
-import org.apache.carbondata.store.api.exception.StoreException;
-import org.apache.carbondata.store.impl.rpc.model.Scan;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * Provides base functionality of CarbonStore, it contains basic implementation of metadata
- * management, data pruning and data scan logic.
- */
-@InterfaceAudience.Internal
-public abstract class CarbonStoreBase implements CarbonStore {
-
-  private static LogService LOGGER =
-      LogServiceFactory.getLogService(CarbonStoreBase.class.getCanonicalName());
-
-  MetaProcessor metaProcessor;
-  private StoreConf storeConf;
-
-  CarbonStoreBase(StoreConf storeConf) {
-    this.storeConf = storeConf;
-    this.metaProcessor = new MetaProcessor(this);
-  }
-
-  @Override
-  public void createTable(TableDescriptor table) throws IOException, StoreException {
-    Objects.requireNonNull(table);
-    metaProcessor.createTable(table);
-  }
-
-  @Override
-  public void dropTable(TableIdentifier table) throws IOException {
-    Objects.requireNonNull(table);
-    metaProcessor.dropTable(table);
-  }
-
-  @Override
-  public CarbonTable getTable(TableIdentifier table) throws IOException {
-    Objects.requireNonNull(table);
-    return metaProcessor.getTable(table);
-  }
-
-  public String getTablePath(String tableName, String databaseName) {
-    Objects.requireNonNull(tableName);
-    Objects.requireNonNull(databaseName);
-    return String.format("%s/%s", storeConf.storeLocation(), tableName);
-  }
-
-  /**
-   * Prune data by using CarbonInputFormat.getSplit
-   * Return a mapping of host address to list of block.
-   * This should be invoked in driver side.
-   */
-  public static List<Distributable> pruneBlock(CarbonTable table, String[] columns,
-      Expression filter) throws IOException {
-    Objects.requireNonNull(table);
-    Objects.requireNonNull(columns);
-    JobConf jobConf = new JobConf(new Configuration());
-    Job job = new Job(jobConf);
-    CarbonTableInputFormat format;
-    try {
-      format = CarbonInputFormatUtil.createCarbonTableInputFormat(
-          job, table, columns, filter, null, null, true);
-    } catch (InvalidConfigurationException e) {
-      throw new IOException(e.getMessage());
-    }
-
-    // We will do FG pruning in reader side, so don't do it here
-    CarbonInputFormat.setFgDataMapPruning(job.getConfiguration(), false);
-    List<InputSplit> splits = format.getSplits(job);
-    List<Distributable> blockInfos = new ArrayList<>(splits.size());
-    for (InputSplit split : splits) {
-      blockInfos.add((Distributable) split);
-    }
-    return blockInfos;
-  }
-
-  /**
-   * Scan data and return matched rows. This should be invoked in worker side.
-   * @param table carbon table
-   * @param scan scan parameter
-   * @return matched rows
-   * @throws IOException if IO error occurs
-   */
-  public static List<CarbonRow> scan(CarbonTable table, Scan scan) throws IOException {
-    CarbonTaskInfo carbonTaskInfo = new CarbonTaskInfo();
-    carbonTaskInfo.setTaskId(System.nanoTime());
-    ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo);
-
-    CarbonMultiBlockSplit mbSplit = scan.getSplit();
-    long limit = scan.getLimit();
-    QueryModel queryModel = createQueryModel(table, scan);
-
-    LOGGER.info(String.format("[QueryId:%d] %s, number of block: %d", scan.getRequestId(),
-        queryModel.toString(), mbSplit.getAllSplits().size()));
-
-    // read all rows by the reader
-    List<CarbonRow> rows = new LinkedList<>();
-    try (CarbonRecordReader<CarbonRow> reader = new IndexedRecordReader(scan.getRequestId(),
-        table, queryModel)) {
-      reader.initialize(mbSplit, null);
-
-      // loop to read required number of rows.
-      // By default, if user does not specify the limit value, limit is Long.MaxValue
-      long rowCount = 0;
-      while (reader.nextKeyValue() && rowCount < limit) {
-        rows.add(reader.getCurrentValue());
-        rowCount++;
-      }
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    }
-    LOGGER.info(String.format("[QueryId:%d] scan completed, return %d rows",
-        scan.getRequestId(), rows.size()));
-    return rows;
-  }
-
-  private static QueryModel createQueryModel(CarbonTable table, Scan scan) {
-    String[] projectColumns = scan.getProjectColumns();
-    Expression filter = null;
-    if (scan.getFilterExpression() != null) {
-      filter = scan.getFilterExpression();
-    }
-    return new QueryModelBuilder(table)
-        .projectColumns(projectColumns)
-        .filterExpression(filter)
-        .build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/DataOperation.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/DataOperation.java b/store/core/src/main/java/org/apache/carbondata/store/impl/DataOperation.java
new file mode 100644
index 0000000..fc50a62
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/DataOperation.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.model.QueryModelBuilder;
+import org.apache.carbondata.core.util.CarbonTaskInfo;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.store.impl.service.model.ScanRequest;
+
+public class DataOperation {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DataOperation.class.getCanonicalName());
+
+  /**
+   * Scan data and return matched rows. This should be invoked in worker side.
+   * @param tableInfo carbon table
+   * @param scan scan parameter
+   * @return matched rows
+   * @throws IOException if IO error occurs
+   */
+  public static List<CarbonRow> scan(TableInfo tableInfo, ScanRequest scan) throws IOException {
+    CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
+    CarbonTaskInfo carbonTaskInfo = new CarbonTaskInfo();
+    carbonTaskInfo.setTaskId(System.nanoTime());
+    ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo);
+
+    CarbonMultiBlockSplit mbSplit = scan.getSplit();
+    long limit = scan.getLimit();
+    QueryModel queryModel = createQueryModel(table, scan);
+
+    LOGGER.info(String.format("[QueryId:%d] %s, number of block: %d", scan.getRequestId(),
+        queryModel.toString(), mbSplit.getAllSplits().size()));
+
+    // read all rows by the reader
+    List<CarbonRow> rows = new LinkedList<>();
+    try (CarbonRecordReader<CarbonRow> reader = new IndexedRecordReader(scan.getRequestId(),
+        table, queryModel)) {
+      reader.initialize(mbSplit, null);
+
+      // loop to read required number of rows.
+      // By default, if user does not specify the limit value, limit is Long.MaxValue
+      long rowCount = 0;
+      while (reader.nextKeyValue() && rowCount < limit) {
+        rows.add(reader.getCurrentValue());
+        rowCount++;
+      }
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+    LOGGER.info(String.format("[QueryId:%d] scan completed, return %d rows",
+        scan.getRequestId(), rows.size()));
+    return rows;
+  }
+
+  private static QueryModel createQueryModel(CarbonTable table, ScanRequest scan) {
+    String[] projectColumns = scan.getProjectColumns();
+    Expression filter = null;
+    if (scan.getFilterExpression() != null) {
+      filter = scan.getFilterExpression();
+    }
+    return new QueryModelBuilder(table)
+        .projectColumns(projectColumns)
+        .filterExpression(filter)
+        .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/DataServicePool.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/DataServicePool.java b/store/core/src/main/java/org/apache/carbondata/store/impl/DataServicePool.java
new file mode 100644
index 0000000..8d99c3e
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/DataServicePool.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.store.impl.service.DataService;
+import org.apache.carbondata.store.impl.service.ServiceFactory;
+
+class DataServicePool {
+  private static final Map<Schedulable, DataService> dataServicePool = new ConcurrentHashMap<>();
+
+  private DataServicePool() {
+  }
+
+  static DataService getOrCreateDataService(Schedulable schedulable) throws IOException {
+    DataService service = dataServicePool.getOrDefault(
+        schedulable,
+        ServiceFactory.createDataService(schedulable.getAddress(), schedulable.getPort()));
+    dataServicePool.putIfAbsent(schedulable, service);
+    return service;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/DelegatedScanner.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/DelegatedScanner.java b/store/core/src/main/java/org/apache/carbondata/store/impl/DelegatedScanner.java
new file mode 100644
index 0000000..bd40667
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/DelegatedScanner.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+import org.apache.carbondata.store.devapi.DataScanner;
+import org.apache.carbondata.store.devapi.Pruner;
+import org.apache.carbondata.store.devapi.ResultBatch;
+import org.apache.carbondata.store.devapi.ScanUnit;
+import org.apache.carbondata.store.devapi.Scanner;
+
+public class DelegatedScanner<T> implements Scanner<T> {
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DelegatedScanner.class.getCanonicalName());
+
+  private Pruner pruner;
+  private DataScanner<T> scanner;
+
+  public DelegatedScanner(Pruner pruner, DataScanner<T> scanner) {
+    this.pruner = pruner;
+    this.scanner = scanner;
+  }
+
+  @Override
+  public List<ScanUnit> prune(TableIdentifier table, Expression filterExpression)
+      throws CarbonException {
+    return pruner.prune(table, filterExpression);
+  }
+
+  @Override
+  public Iterator<? extends ResultBatch<T>> scan(ScanUnit input) throws CarbonException {
+    return scanner.scan(input);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/DistributedCarbonStore.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/DistributedCarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/impl/DistributedCarbonStore.java
deleted file mode 100644
index 3667aea..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/impl/DistributedCarbonStore.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.annotations.InterfaceStability;
-import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.block.Distributable;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder;
-import org.apache.carbondata.processing.util.CarbonLoaderUtil;
-import org.apache.carbondata.store.api.conf.StoreConf;
-import org.apache.carbondata.store.api.descriptor.LoadDescriptor;
-import org.apache.carbondata.store.api.descriptor.SelectDescriptor;
-import org.apache.carbondata.store.api.exception.ExecutionTimeoutException;
-import org.apache.carbondata.store.api.exception.StoreException;
-import org.apache.carbondata.store.impl.master.Schedulable;
-import org.apache.carbondata.store.impl.master.Scheduler;
-import org.apache.carbondata.store.impl.rpc.model.BaseResponse;
-import org.apache.carbondata.store.impl.rpc.model.LoadDataRequest;
-import org.apache.carbondata.store.impl.rpc.model.QueryResponse;
-import org.apache.carbondata.store.impl.rpc.model.Scan;
-
-/**
- * A CarbonStore that leverage multiple servers via RPC calls (Master and Workers)
- */
-@InterfaceAudience.User
-@InterfaceStability.Unstable
-class DistributedCarbonStore extends CarbonStoreBase {
-  private static LogService LOGGER =
-      LogServiceFactory.getLogService(DistributedCarbonStore.class.getCanonicalName());
-  private SegmentTxnManager txnManager;
-  private Scheduler scheduler;
-  private Random random = new Random();
-
-  DistributedCarbonStore(StoreConf storeConf) throws IOException {
-    super(storeConf);
-    this.scheduler = new Scheduler(storeConf);
-    txnManager = SegmentTxnManager.getInstance();
-  }
-
-  @Override
-  public void loadData(LoadDescriptor load) throws IOException, StoreException {
-    Objects.requireNonNull(load);
-    CarbonTable table = metaProcessor.getTable(load.getTable());
-    CarbonLoadModelBuilder builder = new CarbonLoadModelBuilder(table);
-    builder.setInputPath(load.getInputPath());
-    CarbonLoadModel loadModel;
-    try {
-      loadModel = builder.build(load.getOptions(), System.currentTimeMillis(), "0");
-    } catch (InvalidLoadOptionException e) {
-      LOGGER.error(e, "Invalid loadDescriptor options");
-      throw new StoreException(e);
-    } catch (IOException e) {
-      LOGGER.error(e, "Failed to loadDescriptor data");
-      throw e;
-    }
-
-    Schedulable worker = scheduler.pickNexWorker();
-    try {
-      if (loadModel.getFactTimeStamp() == 0) {
-        loadModel.setFactTimeStamp(CarbonUpdateUtil.readCurrentTime());
-      }
-      txnManager.openSegment(loadModel, load.isOverwrite());
-      LoadDataRequest request = new LoadDataRequest(loadModel);
-      BaseResponse response = scheduler.sendRequest(worker, request);
-      if (Status.SUCCESS.ordinal() == response.getStatus()) {
-        txnManager.commitSegment(loadModel);
-      } else {
-        txnManager.closeSegment(loadModel);
-        throw new StoreException(response.getMessage());
-      }
-    } finally {
-      worker.workload.decrementAndGet();
-    }
-  }
-
-  @Override
-  public List<CarbonRow> select(SelectDescriptor select) throws IOException, StoreException {
-    Objects.requireNonNull(select);
-    CarbonTable carbonTable = metaProcessor.getTable(select.getTable());
-    return select(
-        carbonTable,
-        select.getProjection(),
-        select.getFilter(),
-        select.getLimit(),
-        select.getLimit());
-  }
-
-  /**
-   * Execute search by firing RPC call to worker, return the result rows
-   *
-   * @param table       table to search
-   * @param columns     projection column names
-   * @param filter      filter expression
-   * @param globalLimit max number of rows required in Master
-   * @param localLimit  max number of rows required in Worker
-   * @return CarbonRow
-   */
-  private List<CarbonRow> select(CarbonTable table, String[] columns, Expression filter,
-      long globalLimit, long localLimit) throws IOException {
-    Objects.requireNonNull(table);
-    Objects.requireNonNull(columns);
-    if (globalLimit < 0 || localLimit < 0) {
-      throw new IllegalArgumentException("limit should be positive");
-    }
-
-    int queryId = random.nextInt();
-
-    List<CarbonRow> output = new ArrayList<>();
-
-    // prune data and get a mapping of worker hostname to list of blocks,
-    // then add these blocks to the Scan and fire the RPC call
-    List<Distributable> blockInfos = pruneBlock(table, columns, filter);
-
-    Map<String, List<Distributable>> nodeBlockMapping =
-        CarbonLoaderUtil.nodeBlockMapping(
-            blockInfos, -1, scheduler.getAllWorkerAddresses(),
-            CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST, null);
-
-    Set<Map.Entry<String, List<Distributable>>> entries = nodeBlockMapping.entrySet();
-    List<Future<QueryResponse>> futures = new ArrayList<>(entries.size());
-    List<Schedulable> workers = new ArrayList<>(entries.size());
-    for (Map.Entry<String, List<Distributable>> entry : entries) {
-      CarbonMultiBlockSplit split = new CarbonMultiBlockSplit(entry.getValue(), entry.getKey());
-      Scan scan =
-          new Scan(queryId, split, table.getTableInfo(), columns, filter, localLimit);
-
-      // Find an Endpoind and send the request to it
-      // This RPC is non-blocking so that we do not need to wait before send to next worker
-      Schedulable worker = scheduler.pickWorker(entry.getKey());
-      workers.add(worker);
-      futures.add(scheduler.sendRequestAsync(worker, scan));
-    }
-
-    int rowCount = 0;
-    int length = futures.size();
-    for (int i = 0; i < length; i++) {
-      Future<QueryResponse> future = futures.get(i);
-      Schedulable worker = workers.get(i);
-      if (rowCount < globalLimit) {
-        // wait for worker
-        QueryResponse response = null;
-        try {
-          response = future
-              .get((long) (CarbonProperties.getInstance().getQueryTimeout()), TimeUnit.SECONDS);
-        } catch (ExecutionException | InterruptedException e) {
-          throw new IOException("exception in worker: " + e.getMessage());
-        } catch (TimeoutException t) {
-          throw new ExecutionTimeoutException();
-        } finally {
-          worker.workload.decrementAndGet();
-        }
-        LOGGER.info("[QueryId: " + queryId + "] receive search response from worker " + worker);
-        rowCount += onSuccess(queryId, response, output, globalLimit);
-      }
-    }
-    return output;
-  }
-
-  private int onSuccess(int queryId, QueryResponse result, List<CarbonRow> output, long globalLimit)
-      throws IOException {
-    // in case of RPC success, collect all rows in response message
-    if (result.getQueryId() != queryId) {
-      throw new IOException(
-          "queryId in response does not match request: " + result.getQueryId() + " != " + queryId);
-    }
-    if (result.getStatus() != Status.SUCCESS.ordinal()) {
-      throw new IOException("failure in worker: " + result.getMessage());
-    }
-    int rowCount = 0;
-    Object[][] rows = result.getRows();
-    for (Object[] row : rows) {
-      output.add(new CarbonRow(row));
-      rowCount++;
-      if (rowCount >= globalLimit) {
-        break;
-      }
-    }
-    LOGGER.info("[QueryId:" + queryId + "] accumulated result size " + rowCount);
-    return rowCount;
-  }
-
-  @Override
-  public void close() throws IOException {
-    LOGGER.info("Shutting down all workers...");
-    scheduler.stopAllWorkers();
-    LOGGER.info("All workers are shut down");
-    try {
-      LOGGER.info("Stopping master...");
-      scheduler.stopService();
-      LOGGER.info("Master stopped");
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/IndexOperation.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/IndexOperation.java b/store/core/src/main/java/org/apache/carbondata/store/impl/IndexOperation.java
new file mode 100644
index 0000000..49afb45
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/IndexOperation.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.carbondata.core.exception.InvalidConfigurationException;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.api.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+
+public class IndexOperation {
+
+  /**
+   * Prune data by leveraging Carbon's Index
+   */
+  public static List<CarbonInputSplit> pruneBlock(TableInfo tableInfo, Expression filter)
+      throws IOException {
+    Objects.requireNonNull(tableInfo);
+    CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
+    JobConf jobConf = new JobConf(new Configuration());
+    Job job = new Job(jobConf);
+    CarbonTableInputFormat format;
+    try {
+      // We just want to do pruning, so passing empty projection columns
+      format = CarbonInputFormatUtil.createCarbonTableInputFormat(
+          job, table, new String[0], filter, null, null, true);
+    } catch (InvalidConfigurationException e) {
+      throw new IOException(e.getMessage());
+    }
+
+    // We will do FG pruning in reader side, so don't do it here
+    CarbonInputFormat.setFgDataMapPruning(job.getConfiguration(), false);
+    return format.getSplits(job);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/IndexedRecordReader.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/IndexedRecordReader.java b/store/core/src/main/java/org/apache/carbondata/store/impl/IndexedRecordReader.java
index 64f0742..c856d18 100644
--- a/store/core/src/main/java/org/apache/carbondata/store/impl/IndexedRecordReader.java
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/IndexedRecordReader.java
@@ -65,7 +65,7 @@ class IndexedRecordReader extends CarbonRecordReader<CarbonRow> {
   private int queryId;
   private CarbonTable table;
 
-  public IndexedRecordReader(int queryId, CarbonTable table, QueryModel queryModel) {
+  IndexedRecordReader(int queryId, CarbonTable table, QueryModel queryModel) {
     super(queryModel, new CarbonRowReadSupport());
     this.queryId = queryId;
     this.table = table;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/InternalCarbonStoreImpl.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/InternalCarbonStoreImpl.java b/store/core/src/main/java/org/apache/carbondata/store/impl/InternalCarbonStoreImpl.java
new file mode 100644
index 0000000..4821116
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/InternalCarbonStoreImpl.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.sdk.store.DistributedCarbonStore;
+import org.apache.carbondata.sdk.store.conf.StoreConf;
+import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+import org.apache.carbondata.store.devapi.DataLoader;
+import org.apache.carbondata.store.devapi.DataScanner;
+import org.apache.carbondata.store.devapi.InternalCarbonStore;
+import org.apache.carbondata.store.devapi.Pruner;
+import org.apache.carbondata.store.devapi.ScanOption;
+import org.apache.carbondata.store.devapi.Scanner;
+
+/**
+ * This store does prune and scan either remotely by sending RPC to Master/Worker
+ * or in local JVM, depends on parameter passed.
+ */
+@InterfaceAudience.Internal
+public class InternalCarbonStoreImpl extends DistributedCarbonStore implements InternalCarbonStore {
+
+  private Map<TableIdentifier, CarbonTable> tableCache = new ConcurrentHashMap<>();
+  private StoreConf storeConf;
+
+  public InternalCarbonStoreImpl(StoreConf storeConf) throws IOException {
+    super(storeConf);
+    this.storeConf = storeConf;
+  }
+
+  @Override
+  public CarbonTable getCarbonTable(TableIdentifier tableIdentifier)
+      throws CarbonException {
+    Objects.requireNonNull(tableIdentifier);
+    CarbonTable carbonTable = tableCache.getOrDefault(
+        tableIdentifier,
+        CarbonTable.buildFromTableInfo(storeService.getTable(tableIdentifier)));
+    tableCache.putIfAbsent(tableIdentifier, carbonTable);
+    return carbonTable;
+  }
+
+  @Override
+  public DataLoader newLoader(LoadDescriptor load) throws CarbonException {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * By default, it returns a Scanner that does prune and scan remotely
+   */
+  @Override
+  public <T> Scanner<T> newScanner(TableIdentifier identifier, ScanDescriptor scanDescriptor,
+      Map<String, String> scanOption, CarbonReadSupport<T> readSupport)
+      throws CarbonException {
+    Objects.requireNonNull(identifier);
+    Objects.requireNonNull(scanDescriptor);
+    boolean isRemotePrune;
+    boolean isOpPushdown;
+    if (scanOption == null) {
+      isRemotePrune = true;
+      isOpPushdown = true;
+    } else {
+      isRemotePrune = ScanOption.isRemotePrune(scanOption);
+      isOpPushdown = ScanOption.isOperatorPushdown(scanOption);
+    }
+
+    TableInfo tableInfo = MetaOperation.getTable(identifier, storeConf);
+    Pruner pruner;
+    DataScanner<T> scanner;
+
+    if (isRemotePrune) {
+      pruner = new RemotePruner(storeConf.masterHost(), storeConf.pruneServicePort());
+    } else {
+      pruner = new LocalPruner(storeConf);
+    }
+    if (isOpPushdown) {
+      scanner = new RemoteDataScanner<>(tableInfo, scanDescriptor, scanOption, readSupport);
+    } else {
+      scanner = new LocalDataScanner<>(storeConf, scanDescriptor, scanOption);
+    }
+
+    return new DelegatedScanner<>(pruner, scanner);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/LocalCarbonStore.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/LocalCarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/impl/LocalCarbonStore.java
index 40b6bdf..bd8fafe 100644
--- a/store/core/src/main/java/org/apache/carbondata/store/impl/LocalCarbonStore.java
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/LocalCarbonStore.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.store.impl;
 
 import java.io.IOException;
 import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 
@@ -28,11 +29,13 @@ import org.apache.carbondata.common.annotations.InterfaceStability;
 import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.block.Distributable;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.StructType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
 import org.apache.carbondata.processing.loading.DataLoadExecutor;
@@ -40,12 +43,17 @@ import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
 import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder;
-import org.apache.carbondata.store.api.conf.StoreConf;
-import org.apache.carbondata.store.api.descriptor.LoadDescriptor;
-import org.apache.carbondata.store.api.descriptor.SelectDescriptor;
-import org.apache.carbondata.store.api.exception.StoreException;
-import org.apache.carbondata.store.impl.rpc.model.Scan;
-import org.apache.carbondata.store.util.StoreUtil;
+import org.apache.carbondata.sdk.store.CarbonStore;
+import org.apache.carbondata.sdk.store.KeyedRow;
+import org.apache.carbondata.sdk.store.PrimaryKey;
+import org.apache.carbondata.sdk.store.Row;
+import org.apache.carbondata.sdk.store.conf.StoreConf;
+import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+import org.apache.carbondata.sdk.store.util.StoreUtil;
+import org.apache.carbondata.store.impl.service.model.ScanRequest;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -63,7 +71,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
  */
 @InterfaceAudience.User
 @InterfaceStability.Unstable
-class LocalCarbonStore extends CarbonStoreBase {
+public class LocalCarbonStore extends MetaOperation implements CarbonStore {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(LocalCarbonStore.class.getName());
@@ -72,11 +80,11 @@ class LocalCarbonStore extends CarbonStoreBase {
   private Configuration configuration;
   private SegmentTxnManager txnManager;
 
-  LocalCarbonStore(StoreConf storeConf) {
+  public LocalCarbonStore(StoreConf storeConf) {
     this(storeConf, new Configuration());
   }
 
-  LocalCarbonStore(StoreConf storeConf, Configuration hadoopConf) {
+  private LocalCarbonStore(StoreConf storeConf, Configuration hadoopConf) {
     super(storeConf);
     this.storeConf = storeConf;
     this.txnManager = SegmentTxnManager.getInstance();
@@ -84,17 +92,20 @@ class LocalCarbonStore extends CarbonStoreBase {
   }
 
   @Override
-  public void loadData(LoadDescriptor load) throws IOException, StoreException {
+  public void loadData(LoadDescriptor load) throws CarbonException {
     Objects.requireNonNull(load);
-    CarbonTable table = metaProcessor.getTable(load.getTable());
-    CarbonLoadModelBuilder modelBuilder = new CarbonLoadModelBuilder(table);
-    modelBuilder.setInputPath(load.getInputPath());
     CarbonLoadModel loadModel;
     try {
+      TableInfo tableInfo = getTable(load.getTable(), storeConf);
+      CarbonTable table = CarbonTable.buildFromTableInfo(tableInfo);
+      CarbonLoadModelBuilder modelBuilder = new CarbonLoadModelBuilder(table);
+      modelBuilder.setInputPath(load.getInputPath());
       loadModel = modelBuilder.build(load.getOptions(), System.currentTimeMillis(), "0");
     } catch (InvalidLoadOptionException e) {
       LOGGER.error(e, "Invalid loadDescriptor options");
-      throw new StoreException(e.getMessage());
+      throw new CarbonException(e);
+    } catch (IOException e) {
+      throw new CarbonException(e);
     }
 
     if (loadModel.getFactTimeStamp() == 0) {
@@ -106,12 +117,27 @@ class LocalCarbonStore extends CarbonStoreBase {
       loadData(loadModel);
       txnManager.commitSegment(loadModel);
     } catch (Exception e) {
-      txnManager.closeSegment(loadModel);
       LOGGER.error(e, "Failed to load data");
-      throw new StoreException(e);
+      try {
+        txnManager.closeSegment(loadModel);
+      } catch (IOException ex) {
+        LOGGER.error(ex, "Failed to close segment");
+        // Ignoring the exception
+      }
+      throw new CarbonException(e);
     }
   }
 
+  @Override
+  public void upsert(Iterator<KeyedRow> row, StructType schema) throws CarbonException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void delete(Iterator<PrimaryKey> keys) throws CarbonException {
+    throw new UnsupportedOperationException();
+  }
+
   private void loadData(CarbonLoadModel model) throws Exception {
     DataLoadExecutor executor = null;
     try {
@@ -146,19 +172,34 @@ class LocalCarbonStore extends CarbonStoreBase {
   }
 
   @Override
-  public List<CarbonRow> select(SelectDescriptor select) throws IOException {
-    Objects.requireNonNull(select);
-    CarbonTable table = metaProcessor.getTable(select.getTable());
-    List<Distributable> blocks = pruneBlock(table, select.getProjection(), select.getFilter());
-    CarbonMultiBlockSplit split = new CarbonMultiBlockSplit(blocks, "");
-    Scan scan = new Scan(
-        0, split, table.getTableInfo(), select.getProjection(), select.getFilter(),
-        select.getLimit());
-    return scan(table, scan);
+  public List<CarbonRow> scan(ScanDescriptor scanDescriptor) throws CarbonException {
+    Objects.requireNonNull(scanDescriptor);
+    try {
+      TableInfo tableInfo = getTable(scanDescriptor.getTableIdentifier(), storeConf);
+      List<CarbonInputSplit> blocks =
+          IndexOperation.pruneBlock(tableInfo, scanDescriptor.getFilter());
+      CarbonMultiBlockSplit split = new CarbonMultiBlockSplit(blocks, new String[0]);
+      ScanRequest scan =
+          new ScanRequest(0, split, tableInfo, scanDescriptor.getProjection(),
+              scanDescriptor.getFilter(), scanDescriptor.getLimit());
+      return DataOperation.scan(tableInfo, scan);
+    } catch (IOException e) {
+      throw new CarbonException(e);
+    }
+  }
+
+  @Override
+  public Row lookup(PrimaryKey key) throws CarbonException {
+    throw new UnsupportedOperationException();
   }
 
   @Override
-  public void close() throws IOException {
+  public List<Row> lookup(TableIdentifier tableIdentifier, String filterExpression)
+      throws CarbonException {
+    throw new UnsupportedOperationException();
+  }
 
+  @Override
+  public void close() throws IOException {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/LocalDataScanner.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/LocalDataScanner.java b/store/core/src/main/java/org/apache/carbondata/store/impl/LocalDataScanner.java
new file mode 100644
index 0000000..c3429a4
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/LocalDataScanner.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.sdk.store.conf.StoreConf;
+import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+import org.apache.carbondata.store.devapi.DataScanner;
+import org.apache.carbondata.store.devapi.ResultBatch;
+import org.apache.carbondata.store.devapi.ScanUnit;
+import org.apache.carbondata.store.impl.service.model.ScanRequest;
+
+/**
+ * This scanner scans in local JVM
+ * @param <T> scan output
+ */
+public class LocalDataScanner<T> implements DataScanner<T> {
+
+  private StoreConf storeConf;
+  private ScanDescriptor scanDescriptor;
+  private Map<String, String> scanOption;
+
+  LocalDataScanner(StoreConf storeConf, ScanDescriptor scanDescriptor,
+      Map<String, String> scanOption) {
+    this.storeConf = storeConf;
+    this.scanDescriptor = scanDescriptor;
+    this.scanOption = scanOption;
+  }
+
+  @Override
+  public Iterator<? extends ResultBatch<T>> scan(ScanUnit input) throws CarbonException {
+    Objects.requireNonNull(scanDescriptor);
+    try {
+      TableInfo tableInfo = MetaOperation.getTable(scanDescriptor.getTableIdentifier(), storeConf);
+      List<CarbonInputSplit> blocks =
+          IndexOperation.pruneBlock(tableInfo, scanDescriptor.getFilter());
+      CarbonMultiBlockSplit split = new CarbonMultiBlockSplit(blocks, new String[0]);
+      ScanRequest scan =
+          new ScanRequest(0, split, tableInfo, scanDescriptor.getProjection(),
+              scanDescriptor.getFilter(), scanDescriptor.getLimit());
+      List<T> rows = (List<T>) DataOperation.scan(tableInfo, scan);
+      RowMajorResultBatch<T> resultBatch = new RowMajorResultBatch<>(rows);
+      return Collections.singletonList(resultBatch).iterator();
+    } catch (IOException e) {
+      throw new CarbonException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/LocalPruner.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/LocalPruner.java b/store/core/src/main/java/org/apache/carbondata/store/impl/LocalPruner.java
new file mode 100644
index 0000000..734072c
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/LocalPruner.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.sdk.store.conf.StoreConf;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+import org.apache.carbondata.store.devapi.Pruner;
+import org.apache.carbondata.store.devapi.ScanUnit;
+
+public class LocalPruner implements Pruner {
+
+  private StoreConf storeConf;
+
+  LocalPruner(StoreConf storeConf) {
+    this.storeConf = storeConf;
+  }
+
+  @Override
+  public List<ScanUnit> prune(TableIdentifier identifier, Expression filterExpression)
+      throws CarbonException {
+    try {
+      TableInfo table = MetaOperation.getTable(identifier, storeConf);
+      List<CarbonInputSplit> splits = IndexOperation.pruneBlock(table, filterExpression);
+      return splits.stream().map(
+          (Function<CarbonInputSplit, ScanUnit>) inputSplit ->
+              // LocalScanner will scan in local JVM, it is not sending RPC to
+              // schedulable (Worker), so it can be null
+              new BlockScanUnit(inputSplit, null)
+      ).collect(Collectors.toList());
+    } catch (IOException e) {
+      throw new CarbonException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/MetaOperation.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/MetaOperation.java b/store/core/src/main/java/org/apache/carbondata/store/impl/MetaOperation.java
new file mode 100644
index 0000000..3ca1ffe
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/MetaOperation.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.metadata.schema.table.TableSchema;
+import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.writer.ThriftWriter;
+import org.apache.carbondata.sdk.file.CarbonWriterBuilder;
+import org.apache.carbondata.sdk.file.Field;
+import org.apache.carbondata.sdk.store.conf.StoreConf;
+import org.apache.carbondata.sdk.store.descriptor.TableDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+
+/**
+ * Provides table management.
+ */
+@InterfaceAudience.Internal
+public class MetaOperation {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(MetaOperation.class.getCanonicalName());
+
+  private StoreConf storeConf;
+
+  // mapping of table path to CarbonTable object
+  private static final Map<String, TableInfo> cache = new HashMap<>();
+
+  public MetaOperation(StoreConf storeConf) {
+    this.storeConf = storeConf;
+  }
+
+  public void createTable(TableDescriptor descriptor) throws CarbonException {
+    TableIdentifier table = descriptor.getTable();
+    Field[] fields = descriptor.getSchema().getFields();
+    // sort_columns
+    List<String> sortColumnsList = null;
+    try {
+      sortColumnsList = descriptor.getSchema().prepareSortColumns(descriptor.getProperties());
+    } catch (MalformedCarbonCommandException e) {
+      throw new CarbonException(e.getMessage());
+    }
+    ColumnSchema[] sortColumnsSchemaList = new ColumnSchema[sortColumnsList.size()];
+
+    TableSchemaBuilder builder = TableSchema.builder();
+    CarbonWriterBuilder.buildTableSchema(fields, builder, sortColumnsList, sortColumnsSchemaList);
+
+    TableSchema schema = builder.tableName(table.getTableName())
+        .properties(descriptor.getProperties())
+        .setSortColumns(Arrays.asList(sortColumnsSchemaList))
+        .build();
+
+    SchemaEvolutionEntry schemaEvolutionEntry = new SchemaEvolutionEntry();
+    schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis());
+    schema.getSchemaEvolution().getSchemaEvolutionEntryList().add(schemaEvolutionEntry);
+    schema.setTableName(table.getTableName());
+
+    String tablePath = descriptor.getTablePath();
+    if (tablePath == null) {
+      tablePath = getTablePath(table.getTableName(), table.getDatabaseName());
+    }
+
+    TableInfo tableInfo = CarbonTable.builder()
+        .databaseName(table.getDatabaseName())
+        .tableName(table.getTableName())
+        .tablePath(tablePath)
+        .tableSchema(schema)
+        .isTransactionalTable(true)
+        .buildTableInfo();
+
+    try {
+      createTable(tableInfo, descriptor.isIfNotExists());
+    } catch (IOException e) {
+      LOGGER.error(e, "create tableDescriptor failed");
+      throw new CarbonException(e.getMessage());
+    }
+  }
+
+  private void createTable(TableInfo tableInfo, boolean ifNotExists) throws IOException {
+    AbsoluteTableIdentifier identifier = tableInfo.getOrCreateAbsoluteTableIdentifier();
+    boolean tableExists = FileFactory.isFileExist(identifier.getTablePath());
+    if (tableExists) {
+      if (ifNotExists) {
+        return;
+      } else {
+        throw new IOException(
+            "car't create table " + tableInfo.getDatabaseName() + "." + tableInfo.getFactTable()
+                .getTableName() + ", because it already exists");
+      }
+    }
+
+    SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+    String databaseName = tableInfo.getDatabaseName();
+    String tableName = tableInfo.getFactTable().getTableName();
+    org.apache.carbondata.format.TableInfo thriftTableInfo =
+        schemaConverter.fromWrapperToExternalTableInfo(tableInfo, databaseName, tableName);
+
+    String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
+    String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);
+    FileFactory.FileType fileType = FileFactory.getFileType(schemaMetadataPath);
+    try {
+      if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
+        boolean isDirCreated = FileFactory.mkdirs(schemaMetadataPath, fileType);
+        if (!isDirCreated) {
+          throw new IOException("Failed to create the metadata directory " + schemaMetadataPath);
+        }
+      }
+      ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
+      thriftWriter.open(FileWriteOperation.OVERWRITE);
+      thriftWriter.write(thriftTableInfo);
+      thriftWriter.close();
+    } catch (IOException e) {
+      LOGGER.error(e, "Failed to handle create table");
+      throw e;
+    }
+  }
+
+  public void dropTable(TableIdentifier table) throws CarbonException {
+    String tablePath = getTablePath(table.getTableName(), table.getDatabaseName());
+    cache.remove(tablePath);
+    try {
+      FileFactory.deleteFile(tablePath);
+    } catch (IOException e) {
+      throw new CarbonException(e);
+    }
+  }
+
+  public TableInfo getTable(TableIdentifier table) throws CarbonException {
+    return getTable(table, storeConf);
+  }
+
+  public static TableInfo getTable(TableIdentifier table, StoreConf storeConf)
+      throws CarbonException {
+    String tablePath = getTablePath(table.getTableName(), table.getDatabaseName(), storeConf);
+    if (cache.containsKey(tablePath)) {
+      return cache.get(tablePath);
+    } else {
+      org.apache.carbondata.format.TableInfo formatTableInfo = null;
+      try {
+        formatTableInfo = CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(tablePath));
+      } catch (IOException e) {
+        throw new CarbonException(e);
+      }
+      SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+      TableInfo tableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+          formatTableInfo, table.getDatabaseName(), table.getTableName(), tablePath);
+      tableInfo.setTablePath(tablePath);
+      cache.put(tablePath, tableInfo);
+      return tableInfo;
+    }
+  }
+
+  public List<TableDescriptor> listTable() throws CarbonException {
+    throw new UnsupportedOperationException();
+  }
+
+  public TableDescriptor getDescriptor(TableIdentifier table) throws CarbonException {
+    throw new UnsupportedOperationException();
+  }
+
+  public void alterTable(TableIdentifier table, TableDescriptor newTable) throws CarbonException {
+    throw new UnsupportedOperationException();
+  }
+
+  public String getTablePath(String tableName, String databaseName) {
+    Objects.requireNonNull(tableName);
+    Objects.requireNonNull(databaseName);
+    return String.format("%s/%s", storeConf.storeLocation(), tableName);
+  }
+  public static String getTablePath(String tableName, String databaseName, StoreConf storeConf) {
+    Objects.requireNonNull(tableName);
+    Objects.requireNonNull(databaseName);
+    return String.format("%s/%s", storeConf.storeLocation(), tableName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/MetaProcessor.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/MetaProcessor.java b/store/core/src/main/java/org/apache/carbondata/store/impl/MetaProcessor.java
deleted file mode 100644
index 6d03711..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/impl/MetaProcessor.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.impl;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.fileoperations.FileWriteOperation;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.converter.SchemaConverter;
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
-import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.metadata.schema.table.TableSchema;
-import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.core.writer.ThriftWriter;
-import org.apache.carbondata.sdk.file.CarbonWriterBuilder;
-import org.apache.carbondata.sdk.file.Field;
-import org.apache.carbondata.store.api.descriptor.TableDescriptor;
-import org.apache.carbondata.store.api.descriptor.TableIdentifier;
-import org.apache.carbondata.store.api.exception.StoreException;
-
-class MetaProcessor {
-
-  private static LogService LOGGER =
-      LogServiceFactory.getLogService(MetaProcessor.class.getCanonicalName());
-
-  private CarbonStoreBase store;
-
-  MetaProcessor(CarbonStoreBase store) {
-    this.store = store;
-  }
-
-  // mapping of table path to CarbonTable object
-  private Map<String, CarbonTable> cache = new HashMap<>();
-
-  public void createTable(TableDescriptor descriptor) throws StoreException {
-    Field[] fields = descriptor.getSchema().getFields();
-    // sort_columns
-    List<String> sortColumnsList = null;
-    try {
-      sortColumnsList = descriptor.getSchema().prepareSortColumns(descriptor.getProperties());
-    } catch (MalformedCarbonCommandException e) {
-      throw new StoreException(e.getMessage());
-    }
-    ColumnSchema[] sortColumnsSchemaList = new ColumnSchema[sortColumnsList.size()];
-
-    TableSchemaBuilder builder = TableSchema.builder();
-    CarbonWriterBuilder.buildTableSchema(fields, builder, sortColumnsList, sortColumnsSchemaList);
-
-    TableSchema schema = builder.tableName(descriptor.getTable().getTableName())
-        .properties(descriptor.getProperties())
-        .setSortColumns(Arrays.asList(sortColumnsSchemaList))
-        .build();
-
-    SchemaEvolutionEntry schemaEvolutionEntry = new SchemaEvolutionEntry();
-    schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis());
-    schema.getSchemaEvolution().getSchemaEvolutionEntryList().add(schemaEvolutionEntry);
-    schema.setTableName(descriptor.getTable().getTableName());
-
-    String tablePath = descriptor.getTablePath();
-    if (tablePath == null) {
-      tablePath = store.getTablePath(
-          descriptor.getTable().getTableName(), descriptor.getTable().getDatabaseName());
-    }
-
-    TableInfo tableInfo = CarbonTable.builder()
-        .databaseName(descriptor.getTable().getDatabaseName())
-        .tableName(descriptor.getTable().getTableName())
-        .tablePath(tablePath)
-        .tableSchema(schema)
-        .isTransactionalTable(true)
-        .buildTableInfo();
-
-    try {
-      createTable(tableInfo, descriptor.isIfNotExists());
-    } catch (IOException e) {
-      LOGGER.error(e, "create tableDescriptor failed");
-      throw new StoreException(e.getMessage());
-    }
-  }
-
-  private void createTable(TableInfo tableInfo, boolean ifNotExists) throws IOException {
-    AbsoluteTableIdentifier identifier = tableInfo.getOrCreateAbsoluteTableIdentifier();
-    boolean tableExists = FileFactory.isFileExist(identifier.getTablePath());
-    if (tableExists) {
-      if (ifNotExists) {
-        return;
-      } else {
-        throw new IOException(
-            "car't create table " + tableInfo.getDatabaseName() + "." + tableInfo.getFactTable()
-                .getTableName() + ", because it already exists");
-      }
-    }
-
-    SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
-    String databaseName = tableInfo.getDatabaseName();
-    String tableName = tableInfo.getFactTable().getTableName();
-    org.apache.carbondata.format.TableInfo thriftTableInfo =
-        schemaConverter.fromWrapperToExternalTableInfo(tableInfo, databaseName, tableName);
-
-    String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
-    String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);
-    FileFactory.FileType fileType = FileFactory.getFileType(schemaMetadataPath);
-    try {
-      if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
-        boolean isDirCreated = FileFactory.mkdirs(schemaMetadataPath, fileType);
-        if (!isDirCreated) {
-          throw new IOException("Failed to create the metadata directory " + schemaMetadataPath);
-        }
-      }
-      ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
-      thriftWriter.open(FileWriteOperation.OVERWRITE);
-      thriftWriter.write(thriftTableInfo);
-      thriftWriter.close();
-    } catch (IOException e) {
-      LOGGER.error(e, "Failed to handle create table");
-      throw e;
-    }
-  }
-
-  public void dropTable(TableIdentifier table) throws IOException {
-    String tablePath = store.getTablePath(table.getTableName(), table.getDatabaseName());
-    cache.remove(tablePath);
-    FileFactory.deleteFile(tablePath);
-  }
-
-  public CarbonTable getTable(TableIdentifier table) throws IOException {
-    String tablePath = store.getTablePath(table.getTableName(), table.getDatabaseName());
-    if (cache.containsKey(tablePath)) {
-      return cache.get(tablePath);
-    } else {
-      org.apache.carbondata.format.TableInfo formatTableInfo =
-          CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(tablePath));
-      SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
-      TableInfo tableInfo = schemaConverter.fromExternalToWrapperTableInfo(
-          formatTableInfo, table.getDatabaseName(), table.getTableName(), tablePath);
-      tableInfo.setTablePath(tablePath);
-      CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
-      cache.put(tablePath, carbonTable);
-      return carbonTable;
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/RemoteDataScanner.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/RemoteDataScanner.java b/store/core/src/main/java/org/apache/carbondata/store/impl/RemoteDataScanner.java
new file mode 100644
index 0000000..8125426
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/RemoteDataScanner.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+import org.apache.carbondata.store.devapi.DataScanner;
+import org.apache.carbondata.store.devapi.ResultBatch;
+import org.apache.carbondata.store.devapi.ScanUnit;
+import org.apache.carbondata.store.impl.service.DataService;
+import org.apache.carbondata.store.impl.service.model.ScanRequest;
+import org.apache.carbondata.store.impl.service.model.ScanResponse;
+
+/**
+ * This scanner scans in local JVM
+ * @param <T> scan output
+ */
+public class RemoteDataScanner<T> implements DataScanner<T> {
+
+  private TableInfo tableInfo;
+  private ScanDescriptor scanDescriptor;
+  private Map<String, String> scanOption;
+  private CarbonReadSupport<T> readSupport;
+
+  RemoteDataScanner(TableInfo tableInfo, ScanDescriptor scanDescriptor,
+      Map<String, String> scanOption, CarbonReadSupport<T> readSupport) {
+    this.tableInfo = tableInfo;
+    this.scanDescriptor = scanDescriptor;
+    this.scanOption = scanOption;
+    this.readSupport = readSupport;
+  }
+
+  @Override
+  public Iterator<? extends ResultBatch<T>> scan(ScanUnit input) throws CarbonException {
+    List<CarbonInputSplit> toBeScan = new ArrayList<>();
+    if (input instanceof BlockScanUnit) {
+      toBeScan.add(((BlockScanUnit) input).getInputSplit());
+    } else {
+      throw new CarbonException(input.getClass().getName() + " is not supported");
+    }
+    int queryId = new Random().nextInt();
+    CarbonMultiBlockSplit split = new CarbonMultiBlockSplit(toBeScan, input.preferredLocations());
+    try {
+      ScanRequest request = new ScanRequest(queryId, split, tableInfo,
+          scanDescriptor.getProjection(), scanDescriptor.getFilter(), scanDescriptor.getLimit());
+      DataService dataService =
+          DataServicePool.getOrCreateDataService(((BlockScanUnit) input).getSchedulable());
+      ScanResponse response = dataService.scan(request);
+      List<T> rows = Arrays.stream(response.getRows())
+          .map(row -> readSupport.readRow(row))
+          .collect(Collectors.toList());
+
+      return Collections.singletonList(new RowMajorResultBatch<>(rows)).iterator();
+    } catch (IOException e) {
+      throw new CarbonException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/RemotePruner.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/RemotePruner.java b/store/core/src/main/java/org/apache/carbondata/store/impl/RemotePruner.java
new file mode 100644
index 0000000..5fc485e
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/RemotePruner.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+import org.apache.carbondata.store.devapi.Pruner;
+import org.apache.carbondata.store.devapi.ScanUnit;
+import org.apache.carbondata.store.impl.service.PruneService;
+import org.apache.carbondata.store.impl.service.ServiceFactory;
+import org.apache.carbondata.store.impl.service.model.PruneRequest;
+import org.apache.carbondata.store.impl.service.model.PruneResponse;
+
+public class RemotePruner implements Pruner {
+
+  private String pruneServiceHost;
+  private int pruneServiePort;
+
+  RemotePruner(String pruneServiceHost, int pruneServiePort) {
+    this.pruneServiceHost = pruneServiceHost;
+    this.pruneServiePort = pruneServiePort;
+  }
+
+  @Override
+  public List<ScanUnit> prune(TableIdentifier table, Expression filterExpression)
+      throws CarbonException {
+    try {
+      PruneRequest request = new PruneRequest(table, filterExpression);
+      PruneService pruneService = ServiceFactory.createPruneService(
+          pruneServiceHost, pruneServiePort);
+      PruneResponse response = pruneService.prune(request);
+      return response.getScanUnits();
+    } catch (IOException e) {
+      throw new CarbonException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/RowMajorResultBatch.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/RowMajorResultBatch.java b/store/core/src/main/java/org/apache/carbondata/store/impl/RowMajorResultBatch.java
new file mode 100644
index 0000000..1999a63
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/RowMajorResultBatch.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.carbondata.store.devapi.ResultBatch;
+
+public class RowMajorResultBatch<T> implements ResultBatch<T> {
+
+  private Iterator<T> iterator;
+
+  RowMajorResultBatch(List<T> rows) {
+    Objects.requireNonNull(rows);
+    this.iterator = rows.iterator();
+  }
+
+  @Override
+  public boolean isColumnar() {
+    return false;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return iterator.hasNext();
+  }
+
+  @Override
+  public T next() {
+    return iterator.next();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/Schedulable.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/Schedulable.java b/store/core/src/main/java/org/apache/carbondata/store/impl/Schedulable.java
new file mode 100644
index 0000000..6e29b48
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/Schedulable.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+import org.apache.hadoop.io.Writable;
+
+@InterfaceAudience.Internal
+public class Schedulable implements Writable {
+
+  private String id;
+  private String address;
+  private int port;
+  private int cores;
+  public AtomicInteger workload;
+
+  public Schedulable() {
+  }
+
+  public Schedulable(String id, String address, int port, int cores) {
+    this.id = id;
+    this.address = address;
+    this.port = port;
+    this.cores = cores;
+    this.workload = new AtomicInteger();
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public void setId(String id) {
+    this.id = id;
+  }
+
+  public String getAddress() {
+    return address;
+  }
+
+  public void setAddress(String address) {
+    this.address = address;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public void setPort(int port) {
+    this.port = port;
+  }
+
+  public int getCores() {
+    return cores;
+  }
+
+  @Override public String toString() {
+    return "Schedulable{" + "id='" + id + '\'' + ", address='" + address + '\'' + ", port=" + port
+        + '}';
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(id);
+    out.writeUTF(address);
+    out.writeInt(port);
+    out.writeInt(cores);
+    // We are not writing workload since it is only useful for
+    // Scheduler inside the Master. Client of the Master does
+    // not need it
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    id = in.readUTF();
+    address = in.readUTF();
+    port = in.readInt();
+    cores = in.readInt();
+  }
+}