You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/08/06 11:04:52 UTC
[5/5] carbondata git commit: [CARBONDATA-2825][CARBONDATA-2828]
CarbonStore and InternalCarbonStore API This closes #2589
[CARBONDATA-2825][CARBONDATA-2828] CarbonStore and InternalCarbonStore API
This closes #2589
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9f10122a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9f10122a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9f10122a
Branch: refs/heads/carbonstore
Commit: 9f10122af65fcd518bfe8855c4eaa1a423e81caa
Parents: a6027ae
Author: Jacky Li <ja...@qq.com>
Authored: Wed Aug 1 02:16:26 2018 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Mon Aug 6 19:03:12 2018 +0800
----------------------------------------------------------------------
.../common/annotations/InterfaceStability.java | 2 +-
.../core/datastore/row/CarbonRow.java | 30 ++-
.../core/metadata/schema/table/TableInfo.java | 2 +-
.../carbondata/hadoop/CarbonInputSplit.java | 44 +++-
.../hadoop/api/CarbonInputFormat.java | 28 ++-
.../hadoop/api/CarbonTableInputFormat.java | 11 +
.../hadoop/readsupport/CarbonReadSupport.java | 3 +-
.../apache/carbondata/store/WorkerManager.scala | 4 +-
.../org/apache/spark/sql/CarbonSession.scala | 22 +-
.../carbondata/zeppelin/TestCarbonResponse.java | 4 +-
store/conf/store.conf | 4 +-
store/core/pom.xml | 4 +-
.../carbondata/store/api/CarbonStore.java | 32 ---
.../store/api/CarbonStoreFactory.java | 93 --------
.../apache/carbondata/store/api/DataStore.java | 51 ----
.../apache/carbondata/store/api/MetaStore.java | 50 ----
.../apache/carbondata/store/api/SqlStore.java | 34 ---
.../carbondata/store/api/conf/StoreConf.java | 197 ----------------
.../store/api/descriptor/LoadDescriptor.java | 114 ---------
.../store/api/descriptor/SelectDescriptor.java | 111 ---------
.../store/api/descriptor/TableDescriptor.java | 174 --------------
.../store/api/descriptor/TableIdentifier.java | 37 ---
.../exception/ExecutionTimeoutException.java | 22 --
.../store/api/exception/SchedulerException.java | 26 ---
.../store/api/exception/StoreException.java | 33 ---
.../carbondata/store/devapi/DataLoader.java | 49 ++++
.../carbondata/store/devapi/DataScanner.java | 51 ++++
.../store/devapi/InternalCarbonStore.java | 72 ++++++
.../devapi/InternalCarbonStoreFactory.java | 45 ++++
.../apache/carbondata/store/devapi/Pruner.java | 46 ++++
.../carbondata/store/devapi/ResultBatch.java | 47 ++++
.../carbondata/store/devapi/ScanOption.java | 69 ++++++
.../carbondata/store/devapi/ScanUnit.java | 41 ++++
.../apache/carbondata/store/devapi/Scanner.java | 33 +++
.../store/devapi/TransactionalOperation.java | 35 +++
.../carbondata/store/impl/BlockScanUnit.java | 74 ++++++
.../carbondata/store/impl/CarbonStoreBase.java | 177 --------------
.../carbondata/store/impl/DataOperation.java | 95 ++++++++
.../carbondata/store/impl/DataServicePool.java | 41 ++++
.../carbondata/store/impl/DelegatedScanner.java | 57 +++++
.../store/impl/DistributedCarbonStore.java | 232 -------------------
.../carbondata/store/impl/IndexOperation.java | 61 +++++
.../store/impl/IndexedRecordReader.java | 2 +-
.../store/impl/InternalCarbonStoreImpl.java | 110 +++++++++
.../carbondata/store/impl/LocalCarbonStore.java | 95 +++++---
.../carbondata/store/impl/LocalDataScanner.java | 73 ++++++
.../carbondata/store/impl/LocalPruner.java | 58 +++++
.../carbondata/store/impl/MetaOperation.java | 212 +++++++++++++++++
.../carbondata/store/impl/MetaProcessor.java | 170 --------------
.../store/impl/RemoteDataScanner.java | 87 +++++++
.../carbondata/store/impl/RemotePruner.java | 56 +++++
.../store/impl/RowMajorResultBatch.java | 49 ++++
.../carbondata/store/impl/Schedulable.java | 100 ++++++++
.../carbondata/store/impl/master/Master.java | 172 +++++++++++---
.../store/impl/master/PruneServiceImpl.java | 89 +++++++
.../store/impl/master/RegistryServiceImpl.java | 8 +-
.../store/impl/master/Schedulable.java | 76 ------
.../carbondata/store/impl/master/Scheduler.java | 71 +-----
.../store/impl/master/StoreServiceImpl.java | 100 ++++++++
.../store/impl/rpc/RegistryService.java | 32 ---
.../store/impl/rpc/ServiceFactory.java | 43 ----
.../carbondata/store/impl/rpc/StoreService.java | 40 ----
.../store/impl/rpc/model/BaseResponse.java | 69 ------
.../store/impl/rpc/model/LoadDataRequest.java | 60 -----
.../store/impl/rpc/model/QueryResponse.java | 73 ------
.../impl/rpc/model/RegisterWorkerRequest.java | 73 ------
.../impl/rpc/model/RegisterWorkerResponse.java | 54 -----
.../carbondata/store/impl/rpc/model/Scan.java | 108 ---------
.../store/impl/rpc/model/ShutdownRequest.java | 53 -----
.../store/impl/rpc/model/ShutdownResponse.java | 61 -----
.../store/impl/service/DataService.java | 53 +++++
.../store/impl/service/PruneService.java | 41 ++++
.../store/impl/service/RegistryService.java | 32 +++
.../store/impl/service/ServiceFactory.java | 50 ++++
.../store/impl/service/model/BaseResponse.java | 69 ++++++
.../impl/service/model/LoadDataRequest.java | 60 +++++
.../store/impl/service/model/PruneRequest.java | 64 +++++
.../store/impl/service/model/PruneResponse.java | 67 ++++++
.../service/model/RegisterWorkerRequest.java | 73 ++++++
.../service/model/RegisterWorkerResponse.java | 54 +++++
.../store/impl/service/model/ScanRequest.java | 108 +++++++++
.../store/impl/service/model/ScanResponse.java | 73 ++++++
.../impl/service/model/ShutdownRequest.java | 53 +++++
.../impl/service/model/ShutdownResponse.java | 61 +++++
.../store/impl/worker/DataServiceImpl.java | 174 ++++++++++++++
.../store/impl/worker/RequestHandler.java | 166 -------------
.../store/impl/worker/StoreServiceImpl.java | 77 ------
.../carbondata/store/impl/worker/Worker.java | 26 ++-
.../apache/carbondata/store/util/StoreUtil.java | 134 -----------
.../store/DistributedCarbonStoreTest.java | 59 +++--
.../carbondata/store/LocalCarbonStoreTest.java | 39 ++--
.../org/apache/carbondata/store/TestUtil.java | 10 -
.../horizon/rest/client/HorizonClient.java | 10 +-
.../rest/client/impl/SimpleHorizonClient.java | 12 +-
.../horizon/rest/controller/Horizon.java | 3 +-
.../rest/controller/HorizonController.java | 68 +++---
.../rest/model/validate/RequestValidator.java | 38 +--
.../rest/model/view/CreateTableRequest.java | 4 +-
.../horizon/rest/model/view/FieldRequest.java | 2 +-
.../horizon/rest/model/view/LoadRequest.java | 4 +-
.../horizon/rest/model/view/SelectResponse.java | 11 +-
.../apache/carbondata/horizon/HorizonTest.java | 10 +-
store/sdk/pom.xml | 4 +-
.../carbondata/sdk/file/AvroCarbonWriter.java | 50 ++--
.../sdk/file/CarbonWriterBuilder.java | 4 +-
.../org/apache/carbondata/sdk/file/Field.java | 15 +-
.../org/apache/carbondata/sdk/file/Schema.java | 3 +-
.../carbondata/sdk/store/CarbonStore.java | 151 ++++++++++++
.../sdk/store/CarbonStoreFactory.java | 93 ++++++++
.../sdk/store/DistributedCarbonStore.java | 129 +++++++++++
.../apache/carbondata/sdk/store/KeyedRow.java | 45 ++++
.../apache/carbondata/sdk/store/PrimaryKey.java | 26 +++
.../org/apache/carbondata/sdk/store/Row.java | 27 +++
.../carbondata/sdk/store/conf/StoreConf.java | 207 +++++++++++++++++
.../sdk/store/descriptor/LoadDescriptor.java | 149 ++++++++++++
.../sdk/store/descriptor/ScanDescriptor.java | 151 ++++++++++++
.../sdk/store/descriptor/TableDescriptor.java | 214 +++++++++++++++++
.../sdk/store/descriptor/TableIdentifier.java | 63 +++++
.../sdk/store/exception/CarbonException.java | 38 +++
.../exception/ExecutionTimeoutException.java | 27 +++
.../sdk/store/exception/SchedulerException.java | 31 +++
.../sdk/store/service/ServiceFactory.java | 41 ++++
.../sdk/store/service/StoreService.java | 53 +++++
.../carbondata/sdk/store/util/StoreUtil.java | 134 +++++++++++
.../rest/controller/SqlHorizonController.java | 8 +-
.../rest/model/validate/RequestValidator.java | 8 +-
126 files changed, 4912 insertions(+), 3015 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceStability.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceStability.java b/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceStability.java
index 5435028..afd863f 100644
--- a/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceStability.java
+++ b/common/src/main/java/org/apache/carbondata/common/annotations/InterfaceStability.java
@@ -42,7 +42,7 @@ import org.apache.carbondata.common.annotations.InterfaceAudience.*;
* </ul>
*/
@InterfaceAudience.User
-@org.apache.hadoop.classification.InterfaceStability.Evolving
+@InterfaceStability.Evolving
public class InterfaceStability {
/**
* Can evolve while retaining compatibility for minor release boundaries.;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
index 48775d4..1f1f087 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
@@ -17,13 +17,21 @@
package org.apache.carbondata.core.datastore.row;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+import org.apache.carbondata.core.util.ObjectSerializationUtil;
+
+import org.apache.hadoop.io.WritableUtils;
+
/**
* This row class is used to transfer the row data from one step to other step
*/
-public class CarbonRow implements Serializable {
+public class CarbonRow implements Serializable, Writable {
private Object[] data;
@@ -87,4 +95,24 @@ public class CarbonRow implements Serializable {
public void clearData() {
this.data = null;
}
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeCompressedByteArray(out, ObjectSerializationUtil.serialize(data));
+ WritableUtils.writeCompressedByteArray(out, ObjectSerializationUtil.serialize(rawData));
+ out.writeShort(rangeId);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ try {
+ data = (Object[]) ObjectSerializationUtil.deserialize(
+ WritableUtils.readCompressedByteArray(in));
+ rawData = (Object[]) ObjectSerializationUtil.deserialize(
+ WritableUtils.readCompressedByteArray(in));
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ rangeId = in.readShort();
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index 46328f7..abe1810 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -44,7 +44,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTable
* Store the information about the table.
* it stores the fact table as well as aggregate table present in the schema
*/
-public class TableInfo implements Serializable, Writable {
+public class TableInfo implements Serializable, Writable, org.apache.hadoop.io.Writable {
private static final LogService LOGGER =
LogServiceFactory.getLogService(TableInfo.class.getName());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 405ff53..dd3e63f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.internal.index.Block;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -56,6 +57,11 @@ public class CarbonInputSplit extends FileSplit
private Segment segment;
+ // We use this filePath to store the block location instead of the
+ // filePath in FileSplit, because filePath in FileSplit is not Serializable
+ // before Hadoop 3, see HADOOP-13519
+ private String filePath;
+
private String bucketId;
private String blockletId;
@@ -98,6 +104,7 @@ public class CarbonInputSplit extends FileSplit
numberOfBlocklets = 0;
invalidSegments = new ArrayList<>();
version = CarbonProperties.getInstance().getFormatVersion();
+ filePath = "";
}
private CarbonInputSplit(String segmentId, String blockletId, Path path, long start, long length,
@@ -116,6 +123,7 @@ public class CarbonInputSplit extends FileSplit
this.version = version;
this.deleteDeltaFiles = deleteDeltaFiles;
this.dataMapWritePath = dataMapWritePath;
+ this.filePath = path.toString();
}
public CarbonInputSplit(String segmentId, String blockletId, Path path, long start, long length,
@@ -136,6 +144,7 @@ public class CarbonInputSplit extends FileSplit
numberOfBlocklets = 0;
invalidSegments = new ArrayList<>();
version = CarbonProperties.getInstance().getFormatVersion();
+ filePath = path.toString();
}
public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
@@ -149,6 +158,7 @@ public class CarbonInputSplit extends FileSplit
numberOfBlocklets = 0;
invalidSegments = new ArrayList<>();
version = CarbonProperties.getInstance().getFormatVersion();
+ filePath = path.toString();
}
/**
@@ -252,10 +262,24 @@ public class CarbonInputSplit extends FileSplit
if (dataMapWriterPathExists) {
dataMapWritePath = in.readUTF();
}
+ boolean filePathExists = in.readBoolean();
+ if (filePathExists) {
+ filePath = in.readUTF();
+ } else {
+ filePath = super.getPath().toString();
+ }
}
@Override public void write(DataOutput out) throws IOException {
- super.write(out);
+ if (super.getPath() != null) {
+ super.write(out);
+ } else {
+ // see HADOOP-13519, after Java deserialization, super.filePath is
+ // null, so write our filePath instead
+ Text.writeString(out, filePath);
+ out.writeLong(getStart());
+ out.writeLong(getLength());
+ }
out.writeUTF(segment.toString());
out.writeShort(version.number());
out.writeUTF(bucketId);
@@ -278,6 +302,10 @@ public class CarbonInputSplit extends FileSplit
if (dataMapWritePath != null) {
out.writeUTF(dataMapWritePath);
}
+ out.writeBoolean(filePath != null);
+ if (filePath != null) {
+ out.writeUTF(filePath);
+ }
}
public List<String> getInvalidSegments() {
@@ -398,7 +426,7 @@ public class CarbonInputSplit extends FileSplit
}
@Override public String getBlockPath() {
- return getPath().getName();
+ return filePath.substring(filePath.lastIndexOf("/") + 1);
}
@Override public List<Long> getMatchedBlocklets() {
@@ -444,4 +472,16 @@ public class CarbonInputSplit extends FileSplit
public Blocklet makeBlocklet() {
return new Blocklet(getPath().getName(), blockletId);
}
+
+ public String[] preferredLocations() {
+ if (CarbonProperties.isTaskLocality()) {
+ try {
+ return getLocations();
+ } catch (IOException e) {
+ return new String[0];
+ }
+ } else {
+ return new String[0];
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 70c530f..ce0dc72 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -185,9 +185,8 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
/**
* It sets unresolved filter expression.
*
- * @param configuration
- * @para DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
-m filterExpression
+ * @param configuration Hadoop conf
+ * @param filterExpression filter expression
*/
public static void setFilterPredicates(Configuration configuration, Expression filterExpression) {
if (filterExpression == null) {
@@ -245,6 +244,17 @@ m filterExpression
return configuration.get(COLUMN_PROJECTION);
}
+ public static String[] getProjectionColumns(Configuration configuration) {
+ String projectionString = getColumnProjection(configuration);
+ String[] projectColumns;
+ if (projectionString != null) {
+ projectColumns = projectionString.split(",");
+ } else {
+ projectColumns = new String[]{};
+ }
+ return projectColumns;
+ }
+
public static void setFgDataMapPruning(Configuration configuration, boolean enable) {
configuration.set(FGDATAMAP_PRUNING, String.valueOf(enable));
}
@@ -353,7 +363,7 @@ m filterExpression
*/
@Override public abstract List<InputSplit> getSplits(JobContext job) throws IOException;
- protected Expression getFilterPredicates(Configuration configuration) {
+ public static Expression getFilterPredicates(Configuration configuration) {
try {
String filterExprString = configuration.get(FILTER_PREDICATE);
if (filterExprString == null) {
@@ -524,7 +534,7 @@ m filterExpression
return prunedBlocklets;
}
- private List<ExtendedBlocklet> getPrunedFiles4ExternalFormat(JobContext job,
+ public List<ExtendedBlocklet> getPrunedFiles4ExternalFormat(JobContext job,
CarbonTable carbonTable,
FilterResolverIntf resolver, List<Segment> segmentIds) throws IOException {
ExplainCollector.addPruningInfo(carbonTable.getTableName());
@@ -664,13 +674,7 @@ m filterExpression
CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
// set projection column in the query model
- String projectionString = getColumnProjection(configuration);
- String[] projectColumns;
- if (projectionString != null) {
- projectColumns = projectionString.split(",");
- } else {
- projectColumns = new String[]{};
- }
+ String[] projectColumns = getProjectionColumns(configuration);
QueryModel queryModel = new QueryModelBuilder(carbonTable)
.projectColumns(projectColumns)
.filterExpression(getFilterPredicates(configuration))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 84e36e3..267ba0f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -71,7 +71,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
/**
* InputFormat for reading carbondata files with table level metadata support,
@@ -98,6 +100,15 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
private CarbonTable carbonTable;
private ReadCommittedScope readCommittedScope;
+ public CarbonTableInputFormat() {
+ }
+
+ public CarbonTableInputFormat(Configuration conf) throws IOException {
+ this.carbonTable = getOrCreateCarbonTable(conf);
+ this.readCommittedScope = getReadCommitted(
+ new JobContextImpl(conf, new JobID()), carbonTable.getAbsoluteTableIdentifier());
+ }
+
/**
* Get the cached CarbonTable or create it by TableInfo in `configuration`
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
index c126e95..78a2e7f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
@@ -17,6 +17,7 @@
package org.apache.carbondata.hadoop.readsupport;
import java.io.IOException;
+import java.io.Serializable;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
@@ -24,7 +25,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
/**
* This is the interface to convert data reading from RecordReader to row representation.
*/
-public interface CarbonReadSupport<T> {
+public interface CarbonReadSupport<T> extends Serializable {
/**
* Initialization if needed based on the projected column list
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/integration/spark2/src/main/scala/org/apache/carbondata/store/WorkerManager.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/store/WorkerManager.scala b/integration/spark2/src/main/scala/org/apache/carbondata/store/WorkerManager.scala
index 7fff2e5..c624702 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/store/WorkerManager.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/store/WorkerManager.scala
@@ -25,9 +25,9 @@ import org.apache.spark.sql.SparkSession
import org.apache.carbondata.common.annotations.InterfaceAudience
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.sdk.store.CarbonStoreFactory
+import org.apache.carbondata.sdk.store.conf.StoreConf
import org.apache.carbondata.spark.util.Util
-import org.apache.carbondata.store.api.CarbonStoreFactory
-import org.apache.carbondata.store.api.conf.StoreConf
import org.apache.carbondata.store.impl.worker.Worker
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 6c13955..3fccfef 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -22,9 +22,7 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession.Builder
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}
@@ -35,18 +33,16 @@ import org.apache.spark.sql.hive.execution.command.CarbonSetCommand
import org.apache.spark.sql.internal.{SessionState, SharedState}
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.profiler.{Profiler, SQLStart}
-import org.apache.spark.util.{CarbonReflectionUtils, Utils}
+import org.apache.spark.util.CarbonReflectionUtils
import org.apache.carbondata.common.annotations.InterfaceAudience
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, ThreadLocalSessionInfo}
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.sdk.store.{CarbonStore, CarbonStoreFactory}
+import org.apache.carbondata.sdk.store.conf.StoreConf
+import org.apache.carbondata.sdk.store.descriptor.{ScanDescriptor, TableIdentifier}
import org.apache.carbondata.store.WorkerManager
-import org.apache.carbondata.store.api.{CarbonStore, CarbonStoreFactory}
-import org.apache.carbondata.store.api.conf.StoreConf
-import org.apache.carbondata.store.api.descriptor.{SelectDescriptor, TableIdentifier => CTableIdentifier}
-import org.apache.carbondata.streaming.CarbonStreamingQueryListener
/**
* Session implementation for {org.apache.spark.sql.SparkSession}
@@ -204,7 +200,7 @@ class CarbonSession(@transient val sc: SparkContext,
val storeConf = new StoreConf()
storeConf.conf(StoreConf.STORE_LOCATION, CarbonProperties.getStorePath)
storeConf.conf(StoreConf.MASTER_HOST, InetAddress.getLocalHost.getHostAddress)
- storeConf.conf(StoreConf.MASTER_PORT, CarbonProperties.getSearchMasterPort)
+ storeConf.conf(StoreConf.REGISTRY_PORT, CarbonProperties.getSearchMasterPort)
storeConf.conf(StoreConf.WORKER_HOST, InetAddress.getLocalHost.getHostAddress)
storeConf.conf(StoreConf.WORKER_PORT, CarbonProperties.getSearchWorkerPort)
storeConf.conf(StoreConf.WORKER_CORE_NUM, 2)
@@ -241,13 +237,13 @@ class CarbonSession(@transient val sc: SparkContext,
maxRows: Option[Long] = None,
localMaxRows: Option[Long] = None): DataFrame = {
val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
- val select = new SelectDescriptor(
- new CTableIdentifier(table.getTableName, table.getDatabaseName),
+ val select = new ScanDescriptor(
+ new TableIdentifier(table.getTableName, table.getDatabaseName),
columns.map(_.name).toArray,
if (expr != null) CarbonFilters.transformExpression(expr) else null,
localMaxRows.getOrElse(Long.MaxValue)
)
- val rows = store.select(select).iterator()
+ val rows = store.scan(select).iterator()
val output = new java.util.ArrayList[Row]()
val maxRowCount = maxRows.getOrElse(Long.MaxValue)
var rowCount = 0
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java
----------------------------------------------------------------------
diff --git a/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java b/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java
index 4b169f4..1f1ce65 100644
--- a/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java
+++ b/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java
@@ -76,7 +76,7 @@ public class TestCarbonResponse {
" \"timestamp\": 1531884083849,\n" +
" \"status\": 500,\n" +
" \"error\": \"Internal Server Error\",\n" +
- " \"exception\": \"org.apache.carbondata.store.api.exception.StoreException\",\n" +
+ " \"exception\": \"org.apache.carbondata.store.api.exception.CarbonException\",\n" +
" \"message\": \"org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: " +
"Table or view 'sinka6' already exists in database 'default';\",\n" +
" \"path\": \"/table/sql\"\n" +
@@ -84,7 +84,7 @@ public class TestCarbonResponse {
CarbonResponse errorResponse = CarbonResponse.parse(new ByteArrayInputStream(input.getBytes())).get();
assertEquals("org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: " +
"Table or view 'sinka6' already exists in database 'default';", errorResponse.getMessage());
- assertEquals("org.apache.carbondata.store.api.exception.StoreException", errorResponse.getException());
+ assertEquals("org.apache.carbondata.store.api.exception.CarbonException", errorResponse.getException());
assertEquals(1531884083849L, errorResponse.getTimestamp());
assertEquals("Internal Server Error", errorResponse.getError());
assertEquals(500, errorResponse.getStatus());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/conf/store.conf
----------------------------------------------------------------------
diff --git a/store/conf/store.conf b/store/conf/store.conf
index 7f18076..9902061 100644
--- a/store/conf/store.conf
+++ b/store/conf/store.conf
@@ -6,5 +6,7 @@ carbon.store.temp.location=/tmp/carbon.store.temp
# worker and master
carbon.master.host=127.0.0.1
-carbon.master.port=10020
+carbon.master.registry.port=10020
+carbon.master.prune.port=10120
+carbon.master.store.port=9020
carbon.store.location=/tmp/carbon.store
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/pom.xml
----------------------------------------------------------------------
diff --git a/store/core/pom.xml b/store/core/pom.xml
index 44d5ab1..c9e498d 100644
--- a/store/core/pom.xml
+++ b/store/core/pom.xml
@@ -48,8 +48,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.8</source>
+ <target>1.8</target>
</configuration>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStore.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStore.java
deleted file mode 100644
index 3525389..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStore.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api;
-
-import java.io.Closeable;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.annotations.InterfaceStability;
-
-/**
- * Public Interface of CarbonStore
- */
-@InterfaceAudience.User
-@InterfaceStability.Unstable
-public interface CarbonStore extends MetaStore, DataStore, Closeable {
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStoreFactory.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStoreFactory.java b/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStoreFactory.java
deleted file mode 100644
index 76ef450..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/CarbonStoreFactory.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.carbondata.store.api.conf.StoreConf;
-import org.apache.carbondata.store.api.exception.StoreException;
-
-public class CarbonStoreFactory {
- private static Map<String, CarbonStore> distributedStores = new ConcurrentHashMap<>();
- private static Map<String, CarbonStore> localStores = new ConcurrentHashMap<>();
-
- private CarbonStoreFactory() {
- }
-
- public static CarbonStore getDistributedStore(String storeName, StoreConf storeConf)
- throws StoreException {
- if (distributedStores.containsKey(storeName)) {
- return distributedStores.get(storeName);
- }
-
- // create a new instance
- try {
- String className = "org.apache.carbondata.store.impl.DistributedCarbonStore";
- CarbonStore store = createCarbonStore(storeConf, className);
- distributedStores.put(storeName, store);
- return store;
- } catch (ClassNotFoundException | IllegalAccessException | InvocationTargetException |
- InstantiationException e) {
- throw new StoreException(e);
- }
- }
-
- public static void removeDistributedStore(String storeName) throws IOException {
- if (distributedStores.containsKey(storeName)) {
- distributedStores.get(storeName).close();
- distributedStores.remove(storeName);
- }
- }
-
- public static CarbonStore getLocalStore(String storeName, StoreConf storeConf)
- throws StoreException {
- if (localStores.containsKey(storeName)) {
- return localStores.get(storeName);
- }
-
- // create a new instance
- try {
- String className = "org.apache.carbondata.store.impl.LocalCarbonStore";
- CarbonStore store = createCarbonStore(storeConf, className);
- localStores.put(storeName, store);
- return store;
- } catch (ClassNotFoundException | IllegalAccessException | InvocationTargetException |
- InstantiationException e) {
- throw new StoreException(e);
- }
- }
-
- public static void removeLocalStore(String storeName) throws IOException {
- if (localStores.containsKey(storeName)) {
- localStores.get(storeName).close();
- localStores.remove(storeName);
- }
- }
-
- private static CarbonStore createCarbonStore(StoreConf storeConf, String className)
- throws ClassNotFoundException, InstantiationException, IllegalAccessException,
- InvocationTargetException {
- Constructor[] constructor = Class.forName(className).getDeclaredConstructors();
- constructor[0].setAccessible(true);
- return (CarbonStore) constructor[0].newInstance(storeConf);
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/DataStore.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/DataStore.java b/store/core/src/main/java/org/apache/carbondata/store/api/DataStore.java
deleted file mode 100644
index d35c133..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/DataStore.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.annotations.InterfaceStability;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.store.api.descriptor.LoadDescriptor;
-import org.apache.carbondata.store.api.descriptor.SelectDescriptor;
-import org.apache.carbondata.store.api.exception.StoreException;
-
-/**
- * Public interface to write and read data in CarbonStore
- */
-@InterfaceAudience.User
-@InterfaceStability.Unstable
-public interface DataStore {
-
- /**
- * Load data into a Table
- * @param load descriptor for load operation
- * @throws IOException if network or disk IO error occurs
- */
- void loadData(LoadDescriptor load) throws IOException, StoreException;
-
- /**
- * Scan a Table and return matched rows
- * @param select descriptor for scan operation, including required column, filter, etc
- * @return matched rows
- * @throws IOException if network or disk IO error occurs
- */
- List<CarbonRow> select(SelectDescriptor select) throws IOException, StoreException;
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/MetaStore.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/MetaStore.java b/store/core/src/main/java/org/apache/carbondata/store/api/MetaStore.java
deleted file mode 100644
index dea6873..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/MetaStore.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api;
-
-import java.io.IOException;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.annotations.InterfaceStability;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.store.api.descriptor.TableDescriptor;
-import org.apache.carbondata.store.api.descriptor.TableIdentifier;
-import org.apache.carbondata.store.api.exception.StoreException;
-
-/**
- * Public interface to manage table in CarbonStore
- */
-@InterfaceAudience.User
-@InterfaceStability.Unstable
-public interface MetaStore {
- /**
- * Create a Table
- * @param table descriptor for create table operation
- * @throws IOException if network or disk IO error occurs
- */
- void createTable(TableDescriptor table) throws IOException, StoreException;
-
- /**
- * Drop a Table, and remove all data in it
- * @param table table identifier
- * @throws IOException if network or disk IO error occurs
- */
- void dropTable(TableIdentifier table) throws IOException;
-
- CarbonTable getTable(TableIdentifier table) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/SqlStore.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/SqlStore.java b/store/core/src/main/java/org/apache/carbondata/store/api/SqlStore.java
deleted file mode 100644
index 3f52eed..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/SqlStore.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-
-public interface SqlStore {
-
- /**
- * Executor a SQL statement
- * @param sqlString SQL statement
- * @return matched rows
- * @throws IOException if network or disk IO error occurs
- */
- List<CarbonRow> sql(String sqlString) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java b/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java
deleted file mode 100644
index 5e4bb4a..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/conf/StoreConf.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api.conf;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.store.util.StoreUtil;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-public class StoreConf implements Serializable, Writable {
-
- public static final String SELECT_PROJECTION = "carbon.select.projection";
- public static final String SELECT_FILTER = "carbon.select.filter";
- public static final String SELECT_LIMIT = "carbon.select.limit";
-
- public static final String SELECT_ID = "carbon.select.id";
-
- public static final String WORKER_HOST = "carbon.worker.host";
- public static final String WORKER_PORT = "carbon.worker.port";
- public static final String WORKER_CORE_NUM = "carbon.worker.core.num";
- public static final String MASTER_HOST = "carbon.master.host";
- public static final String MASTER_PORT = "carbon.master.port";
-
- public static final String STORE_TEMP_LOCATION = "carbon.store.temp.location";
- public static final String STORE_LOCATION = "carbon.store.location";
- public static final String STORE_NAME = "carbon.store.name";
-
- public static final String STORE_CONF_FILE = "carbon.store.confFile";
-
- private Map<String, String> conf = new HashMap<>();
-
- public StoreConf() {
- String storeConfFile = System.getProperty(STORE_CONF_FILE);
- if (storeConfFile != null) {
- load(storeConfFile);
- }
- }
-
- public StoreConf(String storeName, String storeLocation) {
- conf.put(STORE_NAME, storeName);
- conf.put(STORE_LOCATION, storeLocation);
- }
-
- public StoreConf(String confFilePath) {
- load(confFilePath);
- }
-
- public StoreConf conf(String key, String value) {
- conf.put(key, value);
- return this;
- }
-
- public StoreConf conf(String key, int value) {
- conf.put(key, "" + value);
- return this;
- }
-
- public void load(String filePath) {
- StoreUtil.loadProperties(filePath, this);
- }
-
- public void conf(StoreConf conf) {
- this.conf.putAll(conf.conf);
- }
-
- public Object conf(String key) {
- return conf.get(key);
- }
-
- public String[] projection() {
- return stringArrayValue(SELECT_PROJECTION);
- }
-
- public String filter() {
- return stringValue(SELECT_FILTER);
- }
-
- public int limit() {
- return intValue(SELECT_LIMIT);
- }
-
- public String masterHost() {
- return stringValue(MASTER_HOST);
- }
-
- public int masterPort() {
- return intValue(MASTER_PORT);
- }
-
- public String workerHost() {
- return stringValue(WORKER_HOST);
- }
-
- public int workerPort() {
- return intValue(WORKER_PORT);
- }
-
- public int workerCoreNum() {
- return intValue(WORKER_CORE_NUM);
- }
-
- public String storeLocation() {
- return stringValue(STORE_LOCATION);
- }
-
- public String[] storeTempLocation() {
- return stringArrayValue(STORE_TEMP_LOCATION);
- }
-
- public String selectId() {
- return stringValue(SELECT_ID);
- }
-
- public Configuration newHadoopConf() {
- Configuration hadoopConf = FileFactory.getConfiguration();
- for (Map.Entry<String, String> entry : conf.entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
- if (key != null && value != null && key.startsWith("carbon.hadoop.")) {
- hadoopConf.set(key.substring("carbon.hadoop.".length()), value);
- }
- }
- return hadoopConf;
- }
-
- private String stringValue(String key) {
- Object obj = conf.get(key);
- if (obj == null) {
- return null;
- }
- return obj.toString();
- }
-
- private int intValue(String key) {
- String value = conf.get(key);
- if (value == null) {
- return -1;
- }
- return Integer.parseInt(value);
- }
-
- private String[] stringArrayValue(String key) {
- String value = conf.get(key);
- if (value == null) {
- return null;
- }
- return value.split(",", -1);
- }
-
- @Override public void write(DataOutput out) throws IOException {
- Set<Map.Entry<String, String>> entries = conf.entrySet();
- WritableUtils.writeVInt(out, conf.size());
- for (Map.Entry<String, String> entry : entries) {
- WritableUtils.writeString(out, entry.getKey());
- WritableUtils.writeString(out, entry.getValue());
- }
- }
-
- @Override public void readFields(DataInput in) throws IOException {
- if (conf == null) {
- conf = new HashMap<>();
- }
-
- int size = WritableUtils.readVInt(in);
- String key, value;
- for (int i = 0; i < size; i++) {
- key = WritableUtils.readString(in);
- value = WritableUtils.readString(in);
- conf.put(key, value);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/LoadDescriptor.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/LoadDescriptor.java b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/LoadDescriptor.java
deleted file mode 100644
index c3a4ff7..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/LoadDescriptor.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api.descriptor;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-
-public class LoadDescriptor {
-
- private TableIdentifier table;
- private String inputPath;
- private Map<String, String> options;
- private boolean isOverwrite;
-
- private LoadDescriptor() {
- }
-
- public LoadDescriptor(TableIdentifier table, String inputPath,
- Map<String, String> options, boolean isOverwrite) {
- Objects.requireNonNull(table);
- Objects.requireNonNull(inputPath);
- this.table = table;
- this.inputPath = inputPath;
- this.options = options;
- this.isOverwrite = isOverwrite;
- }
-
- public TableIdentifier getTable() {
- return table;
- }
-
- public void setTable(TableIdentifier table) {
- this.table = table;
- }
-
- public String getInputPath() {
- return inputPath;
- }
-
- public void setInputPath(String inputPath) {
- this.inputPath = inputPath;
- }
-
- public Map<String, String> getOptions() {
- return options;
- }
-
- public void setOptions(Map<String, String> options) {
- this.options = options;
- }
-
- public boolean isOverwrite() {
- return isOverwrite;
- }
-
- public void setOverwrite(boolean overwrite) {
- isOverwrite = overwrite;
- }
-
- public static class Builder {
- private LoadDescriptor load;
- private Map<String, String> options;
-
- private Builder() {
- load = new LoadDescriptor();
- options = new HashMap<>();
- }
-
- public Builder table(TableIdentifier tableIdentifier) {
- load.setTable(tableIdentifier);
- return this;
- }
-
- public Builder overwrite(boolean isOverwrite) {
- load.setOverwrite(isOverwrite);
- return this;
- }
-
- public Builder inputPath(String inputPath) {
- load.setInputPath(inputPath);
- return this;
- }
-
- public Builder options(String key, String value) {
- options.put(key, value);
- return this;
- }
-
- public LoadDescriptor create() {
- load.setOptions(options);
- return load;
- }
- }
-
- public static Builder builder() {
- return new Builder();
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/SelectDescriptor.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/SelectDescriptor.java b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/SelectDescriptor.java
deleted file mode 100644
index c3627a9..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/SelectDescriptor.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api.descriptor;
-
-import java.util.Objects;
-
-import org.apache.carbondata.core.scan.expression.Expression;
-
-public class SelectDescriptor {
-
- private TableIdentifier table;
- private String[] projection;
- private Expression filter;
- private long limit;
-
- private SelectDescriptor() {
- }
-
- public SelectDescriptor(TableIdentifier table, String[] projection,
- Expression filter, long limit) {
- Objects.requireNonNull(table);
- Objects.requireNonNull(projection);
- this.table = table;
- this.projection = projection;
- this.filter = filter;
- this.limit = limit;
- }
-
- public TableIdentifier getTable() {
- return table;
- }
-
- public void setTable(TableIdentifier table) {
- this.table = table;
- }
-
- public String[] getProjection() {
- return projection;
- }
-
- public void setProjection(String[] projection) {
- this.projection = projection;
- }
-
- public Expression getFilter() {
- return filter;
- }
-
- public void setFilter(Expression filter) {
- this.filter = filter;
- }
-
- public long getLimit() {
- return limit;
- }
-
- public void setLimit(long limit) {
- this.limit = limit;
- }
-
- public static class Builder {
- private SelectDescriptor select;
-
- private Builder() {
- select = new SelectDescriptor();
- }
-
- public Builder table(TableIdentifier tableIdentifier) {
- select.setTable(tableIdentifier);
- return this;
- }
-
- public Builder select(String... columnNames) {
- select.setProjection(columnNames);
- return this;
- }
-
- public Builder filter(Expression filter) {
- select.setFilter(filter);
- return this;
- }
-
- public Builder limit(long limit) {
- select.setLimit(limit);
- return this;
- }
-
- public SelectDescriptor create() {
- return select;
- }
- }
-
- public static Builder builder() {
- return new Builder();
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableDescriptor.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableDescriptor.java b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableDescriptor.java
deleted file mode 100644
index 2d677a8..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableDescriptor.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api.descriptor;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.sdk.file.Field;
-import org.apache.carbondata.sdk.file.Schema;
-
-public class TableDescriptor {
-
- private boolean ifNotExists;
- private TableIdentifier table;
- private String tablePath;
- private Schema schema;
- private Map<String, String> properties;
- private String comment;
-
- private TableDescriptor() {
- }
-
- public TableDescriptor(TableIdentifier table, Schema schema,
- Map<String, String> properties, String tablePath, String comment, boolean ifNotExists) {
- Objects.requireNonNull(table);
- Objects.requireNonNull(schema);
- this.table = table;
- this.ifNotExists = ifNotExists;
- this.schema = schema;
- this.properties = properties;
- this.tablePath = tablePath;
- this.comment = comment;
- }
-
- public boolean isIfNotExists() {
- return ifNotExists;
- }
-
- public void setIfNotExists(boolean ifNotExists) {
- this.ifNotExists = ifNotExists;
- }
-
- public TableIdentifier getTable() {
- return table;
- }
-
- public void setTable(TableIdentifier table) {
- this.table = table;
- }
-
- public Schema getSchema() {
- return schema;
- }
-
- public void setSchema(Schema schema) {
- this.schema = schema;
- }
-
- public Map<String, String> getProperties() {
- return properties;
- }
-
- public void setProperties(Map<String, String> properties) {
- this.properties = properties;
- }
-
- public String getComment() {
- return comment;
- }
-
- public void setComment(String comment) {
- this.comment = comment;
- }
-
- public void setTablePath(String tablePath) {
- this.tablePath = tablePath;
- }
-
- public String getTablePath() {
- return tablePath;
- }
-
- public static class Builder {
-
- private TableDescriptor table;
- private List<Field> fields;
- private Map<String, String> tblProperties;
-
- private Builder() {
- table = new TableDescriptor();
- fields = new ArrayList<>();
- tblProperties = new HashMap<>();
- }
-
- public Builder ifNotExists() {
- table.setIfNotExists(true);
- return this;
- }
-
- public Builder table(TableIdentifier tableId) {
- table.setTable(tableId);
- return this;
- }
-
- public Builder tablePath(String tablePath) {
- table.setTablePath(tablePath);
- return this;
- }
-
- public Builder comment(String tableComment) {
- table.setComment(tableComment);
- return this;
- }
-
- public Builder column(String name, DataType dataType) {
- fields.add(new Field(name, dataType));
- return this;
- }
-
- public Builder column(String name, DataType dataType, String comment) {
- Field field = new Field(name, dataType);
- field.setColumnComment(comment);
- fields.add(field);
- return this;
- }
-
- public Builder column(String name, DataType dataType, int precision, int scale, String comment)
- {
- Field field = new Field(name, dataType);
- field.setColumnComment(comment);
- field.setScale(scale);
- field.setPrecision(precision);
- fields.add(field);
- return this;
- }
-
- public Builder tblProperties(String key, String value) {
- tblProperties.put(key, value);
- return this;
- }
-
- public TableDescriptor create() {
- Field[] fieldArray = new Field[fields.size()];
- fieldArray = fields.toArray(fieldArray);
- Schema schema = new Schema(fieldArray);
- table.setSchema(schema);
- table.setProperties(tblProperties);
- return table;
- }
- }
-
- public static Builder builder() {
- return new Builder();
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableIdentifier.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableIdentifier.java b/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableIdentifier.java
deleted file mode 100644
index ab8edf8..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/descriptor/TableIdentifier.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api.descriptor;
-
-public class TableIdentifier {
- private String tableName;
- private String databaseName;
-
- public TableIdentifier(String tableName, String databaseName) {
- this.tableName = tableName;
- this.databaseName = databaseName;
- }
-
- public String getTableName() {
- return tableName;
- }
-
- public String getDatabaseName() {
- return databaseName;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/exception/ExecutionTimeoutException.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/exception/ExecutionTimeoutException.java b/store/core/src/main/java/org/apache/carbondata/store/api/exception/ExecutionTimeoutException.java
deleted file mode 100644
index 728837d..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/exception/ExecutionTimeoutException.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api.exception;
-
-public class ExecutionTimeoutException extends RuntimeException {
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/exception/SchedulerException.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/exception/SchedulerException.java b/store/core/src/main/java/org/apache/carbondata/store/api/exception/SchedulerException.java
deleted file mode 100644
index 28b8a50..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/exception/SchedulerException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api.exception;
-
-public class SchedulerException extends RuntimeException {
-
- public SchedulerException(String message) {
- super(message);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/api/exception/StoreException.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/api/exception/StoreException.java b/store/core/src/main/java/org/apache/carbondata/store/api/exception/StoreException.java
deleted file mode 100644
index 315a09b..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/api/exception/StoreException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.api.exception;
-
-public class StoreException extends Exception {
-
- public StoreException() {
- super();
- }
-
- public StoreException(String message) {
- super(message);
- }
-
- public StoreException(Exception e) {
- super(e);
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/DataLoader.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/DataLoader.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/DataLoader.java
new file mode 100644
index 0000000..4b79ee4
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/DataLoader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.devapi;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.metadata.datatype.StructType;
+import org.apache.carbondata.sdk.store.Row;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+
+/**
+ * A Loader is used to load data from files to the table
+ */
+@InterfaceAudience.Developer("Integration")
+@InterfaceStability.Unstable
+public interface DataLoader extends TransactionalOperation, Serializable {
+ /**
+ * Trigger the load operation
+ * @throws CarbonException if any error occurs
+ */
+ void load() throws CarbonException;
+
+ /**
+ * Append a batch of rows.
+ * @param rows rows to append
+ * @param schema schema of the input row
+ * @throws CarbonException if any error occurs
+ */
+ void append(Iterator<Row> rows, StructType schema) throws CarbonException;
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/DataScanner.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/DataScanner.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/DataScanner.java
new file mode 100644
index 0000000..c6c5628
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/DataScanner.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.devapi;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+
+@InterfaceAudience.Developer("Integration")
+@InterfaceStability.Unstable
+public interface DataScanner<T> extends Serializable {
+
+ /**
+ * Perform a scan in a distributed compute framework like Spark, Presto, etc.
+ * Filter/Projection/Limit operation is pushed down to the scan.
+ *
+ * This should be used with {@link Pruner#prune(TableIdentifier, Expression)}
+ * in a distributed compute environment. It enables the framework to
+ * do a parallel scan by creating multiple {@link ScanUnit} and perform
+ * parallel scan in worker, such as Spark executor
+ *
+ * The return result is in batch so that the caller can start next
+ * level of computation before getting all results, such as
+ * implementing a `prefetch` execution model.
+ *
+ * @param input one scan unit
+ * @return scan result, the result is returned in batch
+ * @throws CarbonException if any error occurs
+ */
+ Iterator<? extends ResultBatch<T>> scan(ScanUnit input) throws CarbonException;
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStore.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStore.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStore.java
new file mode 100644
index 0000000..01c2008
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStore.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.devapi;
+
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.sdk.store.CarbonStore;
+import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+
+/**
+ * Internal API for engine integration developers
+ */
+@InterfaceAudience.Developer("Integration")
+@InterfaceStability.Unstable
+public interface InternalCarbonStore extends CarbonStore {
+
+ /**
+ * Get CarbonTable object from the store
+ *
+ * @param tableIdentifier table identifier
+ * @return CarbonTable object
+ * @throws CarbonException if any error occurs
+ */
+ CarbonTable getCarbonTable(TableIdentifier tableIdentifier) throws CarbonException;
+
+ /**
+ * Return a new Loader that can be used to load data in distributed compute framework
+ * @param load descriptor for load operation
+ * @return a new Loader
+ * @throws CarbonException if any error occurs
+ */
+ DataLoader newLoader(LoadDescriptor load) throws CarbonException;
+
+ /**
+ * Return a new Scanner that can be used in for parallel scan
+ *
+ * @param tableIdentifier table to scan
+ * @param scanOption options for scan, use {@link ScanOption} for the map key
+ * @param readSupport read support to convert the row to output object
+ * @param <T> the target object type contain in {@link ResultBatch}
+ * @return a new Scanner
+ * @throws CarbonException if any error occurs
+ */
+ <T> Scanner<T> newScanner(
+ TableIdentifier tableIdentifier,
+ ScanDescriptor scanDescriptor,
+ Map<String, String> scanOption,
+ CarbonReadSupport<T> readSupport) throws CarbonException;
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStoreFactory.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStoreFactory.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStoreFactory.java
new file mode 100644
index 0000000..c875aa1
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/InternalCarbonStoreFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.devapi;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.sdk.store.conf.StoreConf;
+import org.apache.carbondata.store.impl.InternalCarbonStoreImpl;
+
+@InterfaceAudience.Developer("Integration")
+@InterfaceStability.Unstable
+public class InternalCarbonStoreFactory {
+
+ private static final Map<String, InternalCarbonStore> stores = new ConcurrentHashMap<>();
+
+ public static synchronized InternalCarbonStore getStore(String storeName, StoreConf conf)
+ throws IOException {
+ InternalCarbonStore store = stores.getOrDefault(storeName, newStore(conf));
+ stores.putIfAbsent(storeName, store);
+ return store;
+ }
+
+ private static InternalCarbonStore newStore(StoreConf conf) throws IOException {
+ return new InternalCarbonStoreImpl(conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/Pruner.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/Pruner.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/Pruner.java
new file mode 100644
index 0000000..4a1d2e5
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/Pruner.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.devapi;
+
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+
+@InterfaceAudience.Developer("Integration")
+@InterfaceStability.Unstable
+public interface Pruner {
+
+ /**
+ * Return an array of ScanUnit which will be the input in
+ * {@link Scanner#scan(ScanUnit)}
+ *
+ * Implementation will leverage index to prune using specified
+ * filter expression
+ *
+ * @param table table identifier
+ * @param filterExpression expression of filter predicate given by user
+ * @return list of ScanUnit which should be passed to
+ * {@link Scanner#scan(ScanUnit)}
+ * @throws CarbonException if any error occurs
+ */
+ List<ScanUnit> prune(TableIdentifier table, Expression filterExpression) throws CarbonException;
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/ResultBatch.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/ResultBatch.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/ResultBatch.java
new file mode 100644
index 0000000..221e0f6
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/ResultBatch.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.devapi;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+@InterfaceAudience.Developer("Integration")
+@InterfaceStability.Unstable
+public interface ResultBatch<T> {
+
+ /**
+ * Return true if the result is returned in columnar batch, otherwise is row by row.
+ * By default, it is columnar batch.
+ */
+ default boolean isColumnar() {
+ return true;
+ }
+
+ /**
+ * Return true if there is more elements in this batch.
+ */
+ boolean hasNext();
+
+ /**
+ * Return next item.
+ * If {@link #isColumnar()} return true, there is only one element in this batch
+ * which is {@link ColumnarBatch}, otherwise, this batch return row by row, caller
+ * should call next() until no element left.
+ */
+ T next();
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/devapi/ScanOption.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/ScanOption.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/ScanOption.java
new file mode 100644
index 0000000..8e5797d
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/ScanOption.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.devapi;
+
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+@InterfaceAudience.Developer("Integration")
+@InterfaceStability.Unstable
+public class ScanOption {
+
+ /** batch size in number of rows in one {@link ResultBatch} */
+ public static final String BATCH_SIZE = "batchSize";
+
+ /**
+ * set to true if return in row major object in {@link ResultBatch},
+ * otherwise columnar object is returned
+ */
+ public static final String ROW_MAJOR = "rowMajor";
+
+ /**
+ * set to true if enable remote prune by RPC call,
+ * otherwise prune executes in caller's JVM
+ */
+ public static final String REMOTE_PRUNE = "remotePrune";
+
+ /**
+ * set to true if enable operator pushdown like scan and load
+ * otherwise operation executes in caller's JVM
+ */
+ public static final String OP_PUSHDOWN = "operatorPushDown";
+
+ /**
+ * Return true if REMOTE_PRUNE is set, default is false
+ */
+ public static boolean isRemotePrune(Map<String, String> options) {
+ if (options == null) {
+ return false;
+ }
+ return Boolean.valueOf(options.getOrDefault(REMOTE_PRUNE, "false"));
+ }
+
+ /**
+ * Return true if REMOTE_PRUNE is set, default is false
+ */
+ public static boolean isOperatorPushdown(Map<String, String> options) {
+ if (options == null) {
+ return false;
+ }
+ return Boolean.valueOf(options.getOrDefault(OP_PUSHDOWN, "false"));
+ }
+}
\ No newline at end of file