You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/08/06 11:04:49 UTC
[2/5] carbondata git commit: [CARBONDATA-2825][CARBONDATA-2828]
CarbonStore and InternalCarbonStore API This closes #2589
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/service/model/ShutdownResponse.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/service/model/ShutdownResponse.java b/store/core/src/main/java/org/apache/carbondata/store/impl/service/model/ShutdownResponse.java
new file mode 100644
index 0000000..2650bbf
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/service/model/ShutdownResponse.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl.service.model;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+import org.apache.hadoop.io.Writable;
+
+@InterfaceAudience.Internal
+public class ShutdownResponse implements Serializable, Writable {
+ private int status;
+ private String message;
+
+ public ShutdownResponse() {
+ }
+
+ public ShutdownResponse(int status, String message) {
+ this.status = status;
+ this.message = message;
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(status);
+ out.writeUTF(message);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ status = in.readInt();
+ message = in.readUTF();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/DataServiceImpl.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/DataServiceImpl.java b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/DataServiceImpl.java
new file mode 100644
index 0000000..d1b8f43
--- /dev/null
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/DataServiceImpl.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store.impl.worker;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor;
+import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.processing.loading.DataLoadExecutor;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.sdk.store.util.StoreUtil;
+import org.apache.carbondata.store.impl.DataOperation;
+import org.apache.carbondata.store.impl.Status;
+import org.apache.carbondata.store.impl.service.DataService;
+import org.apache.carbondata.store.impl.service.model.BaseResponse;
+import org.apache.carbondata.store.impl.service.model.LoadDataRequest;
+import org.apache.carbondata.store.impl.service.model.ScanRequest;
+import org.apache.carbondata.store.impl.service.model.ScanResponse;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+@InterfaceAudience.Internal
+public class DataServiceImpl implements DataService {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(DataServiceImpl.class.getCanonicalName());
+
+ // temp location for loading (writing sort temp files)
+ private String[] storeTempLocation;
+ private Configuration hadoopConf;
+
+ DataServiceImpl(Worker worker) {
+ this.hadoopConf = worker.getHadoopConf();
+ this.storeTempLocation = worker.getConf().storeTempLocation();
+ }
+
+ @Override
+ public BaseResponse loadData(LoadDataRequest request) {
+ DataLoadExecutor executor = null;
+ try {
+ CarbonLoadModel model = request.getModel();
+
+ JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0);
+ CarbonInputFormatUtil.createJobTrackerID(new Date());
+ TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
+ TaskAttemptID taskAttemptId = new TaskAttemptID(taskId, 0);
+ Configuration configuration = new Configuration(hadoopConf);
+ StoreUtil.configureCSVInputFormat(configuration, model);
+ configuration.set(FileInputFormat.INPUT_DIR, model.getFactFilePath());
+ // Set up the attempt context required to use in the output committer.
+ TaskAttemptContext hadoopAttemptContext =
+ new TaskAttemptContextImpl(configuration, taskAttemptId);
+
+ CSVInputFormat format = new CSVInputFormat();
+ List<InputSplit> splits = format.getSplits(hadoopAttemptContext);
+
+ CarbonIterator<Object[]>[] readerIterators = new CSVRecordReaderIterator[splits.size()];
+ for (int index = 0; index < splits.size(); index++) {
+ readerIterators[index] = new CSVRecordReaderIterator(
+ format.createRecordReader(splits.get(index), hadoopAttemptContext), splits.get(index),
+ hadoopAttemptContext);
+ }
+
+ executor = new DataLoadExecutor();
+ executor.execute(model, storeTempLocation, readerIterators);
+
+ return new BaseResponse(Status.SUCCESS.ordinal(), "");
+ } catch (IOException e) {
+ LOGGER.error(e, "Failed to handle load data");
+ return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage());
+ } catch (InterruptedException e) {
+ LOGGER.error(e, "Interrupted handle load data ");
+ return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error(e, "Failed to execute load data ");
+ return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage());
+ } finally {
+ if (executor != null) {
+ executor.close();
+ StoreUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId());
+ }
+ }
+ }
+
+ @Override
+ public ScanResponse scan(ScanRequest scan) {
+ try {
+ LOGGER.info(String.format("[QueryId:%d] receive search request", scan.getRequestId()));
+ List<CarbonRow> rows = DataOperation.scan(scan.getTableInfo(), scan);
+ LOGGER.info(String.format("[QueryId:%d] sending success response", scan.getRequestId()));
+ return createSuccessResponse(scan, rows);
+ } catch (IOException e) {
+ LOGGER.error(e);
+ LOGGER.info(String.format("[QueryId:%d] sending failure response", scan.getRequestId()));
+ return createFailureResponse(scan, e);
+ }
+ }
+
+ /**
+ * create a failure response
+ */
+ private ScanResponse createFailureResponse(ScanRequest scan, Throwable throwable) {
+ return new ScanResponse(scan.getRequestId(), Status.FAILURE.ordinal(),
+ throwable.getMessage(), new Object[0][]);
+ }
+
+ /**
+ * create a success response with result rows
+ */
+ private ScanResponse createSuccessResponse(ScanRequest scan, List<CarbonRow> rows) {
+ Iterator<CarbonRow> itor = rows.iterator();
+ Object[][] output = new Object[rows.size()][];
+ int i = 0;
+ while (itor.hasNext()) {
+ output[i++] = itor.next().getData();
+ }
+ return new ScanResponse(scan.getRequestId(), Status.SUCCESS.ordinal(), "", output);
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+ return versionID;
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
+ int clientMethodsHash) throws IOException {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ LOGGER.info("Shutting down worker...");
+ SearchModeDetailQueryExecutor.shutdownThreadPool();
+ SearchModeVectorDetailQueryExecutor.shutdownThreadPool();
+ LOGGER.info("Worker shut down");
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/RequestHandler.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/RequestHandler.java b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/RequestHandler.java
deleted file mode 100644
index fd13b20..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/RequestHandler.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.impl.worker;
-
-import java.io.IOException;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor;
-import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor;
-import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-import org.apache.carbondata.processing.loading.DataLoadExecutor;
-import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
-import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator;
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.store.api.conf.StoreConf;
-import org.apache.carbondata.store.impl.CarbonStoreBase;
-import org.apache.carbondata.store.impl.Status;
-import org.apache.carbondata.store.impl.rpc.model.BaseResponse;
-import org.apache.carbondata.store.impl.rpc.model.LoadDataRequest;
-import org.apache.carbondata.store.impl.rpc.model.QueryResponse;
-import org.apache.carbondata.store.impl.rpc.model.Scan;
-import org.apache.carbondata.store.impl.rpc.model.ShutdownRequest;
-import org.apache.carbondata.store.impl.rpc.model.ShutdownResponse;
-import org.apache.carbondata.store.util.StoreUtil;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-
-/**
- * It handles request from master.
- */
-@InterfaceAudience.Internal
-class RequestHandler {
-
- private StoreConf storeConf;
- private Configuration hadoopConf;
-
- RequestHandler(StoreConf conf, Configuration hadoopConf) {
- this.storeConf = conf;
- this.hadoopConf = hadoopConf;
- }
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(RequestHandler.class.getName());
-
- QueryResponse handleScan(Scan scan) {
- try {
- LOGGER.info(String.format("[QueryId:%d] receive search request", scan.getRequestId()));
- CarbonTable table = CarbonTable.buildFromTableInfo(scan.getTableInfo());
- List<CarbonRow> rows = CarbonStoreBase.scan(table, scan);
- LOGGER.info(String.format("[QueryId:%d] sending success response", scan.getRequestId()));
- return createSuccessResponse(scan, rows);
- } catch (IOException e) {
- LOGGER.error(e);
- LOGGER.info(String.format("[QueryId:%d] sending failure response", scan.getRequestId()));
- return createFailureResponse(scan, e);
- }
- }
-
- ShutdownResponse handleShutdown(ShutdownRequest request) {
- LOGGER.info("Shutting down worker...");
- SearchModeDetailQueryExecutor.shutdownThreadPool();
- SearchModeVectorDetailQueryExecutor.shutdownThreadPool();
- LOGGER.info("Worker shut down");
- return new ShutdownResponse(Status.SUCCESS.ordinal(), "");
- }
-
- /**
- * create a failure response
- */
- private QueryResponse createFailureResponse(Scan scan, Throwable throwable) {
- return new QueryResponse(scan.getRequestId(), Status.FAILURE.ordinal(),
- throwable.getMessage(), new Object[0][]);
- }
-
- /**
- * create a success response with result rows
- */
- private QueryResponse createSuccessResponse(Scan scan, List<CarbonRow> rows) {
- Iterator<CarbonRow> itor = rows.iterator();
- Object[][] output = new Object[rows.size()][];
- int i = 0;
- while (itor.hasNext()) {
- output[i++] = itor.next().getData();
- }
- return new QueryResponse(scan.getRequestId(), Status.SUCCESS.ordinal(), "", output);
- }
-
- public BaseResponse handleLoadData(LoadDataRequest request) {
- DataLoadExecutor executor = null;
- try {
- CarbonLoadModel model = request.getModel();
-
- JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0);
- CarbonInputFormatUtil.createJobTrackerID(new Date());
- TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
- TaskAttemptID taskAttemptId = new TaskAttemptID(taskId, 0);
- Configuration configuration = new Configuration(hadoopConf);
- StoreUtil.configureCSVInputFormat(configuration, model);
- configuration.set(FileInputFormat.INPUT_DIR, model.getFactFilePath());
- // Set up the attempt context required to use in the output committer.
- TaskAttemptContext hadoopAttemptContext =
- new TaskAttemptContextImpl(configuration, taskAttemptId);
-
- CSVInputFormat format = new CSVInputFormat();
- List<InputSplit> splits = format.getSplits(hadoopAttemptContext);
-
- CarbonIterator<Object[]>[] readerIterators = new CSVRecordReaderIterator[splits.size()];
- for (int index = 0; index < splits.size(); index++) {
- readerIterators[index] = new CSVRecordReaderIterator(
- format.createRecordReader(splits.get(index), hadoopAttemptContext), splits.get(index),
- hadoopAttemptContext);
- }
-
- executor = new DataLoadExecutor();
- executor.execute(model, storeConf.storeTempLocation(), readerIterators);
-
- return new BaseResponse(Status.SUCCESS.ordinal(), "");
- } catch (IOException e) {
- LOGGER.error(e, "Failed to handle load data");
- return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage());
- } catch (InterruptedException e) {
- LOGGER.error(e, "Interrupted handle load data ");
- return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage());
- } catch (Exception e) {
- LOGGER.error(e, "Failed to execute load data ");
- return new BaseResponse(Status.FAILURE.ordinal(), e.getMessage());
- } finally {
- if (executor != null) {
- executor.close();
- StoreUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/StoreServiceImpl.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/StoreServiceImpl.java b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/StoreServiceImpl.java
deleted file mode 100644
index 26f252c..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/StoreServiceImpl.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.impl.worker;
-
-import java.io.IOException;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.store.impl.rpc.StoreService;
-import org.apache.carbondata.store.impl.rpc.model.BaseResponse;
-import org.apache.carbondata.store.impl.rpc.model.LoadDataRequest;
-import org.apache.carbondata.store.impl.rpc.model.QueryResponse;
-import org.apache.carbondata.store.impl.rpc.model.Scan;
-import org.apache.carbondata.store.impl.rpc.model.ShutdownRequest;
-import org.apache.carbondata.store.impl.rpc.model.ShutdownResponse;
-
-import org.apache.hadoop.ipc.ProtocolSignature;
-
-@InterfaceAudience.Internal
-public class StoreServiceImpl implements StoreService {
-
- private Worker worker;
- RequestHandler handler;
-
- public StoreServiceImpl(Worker worker) {
- this.worker = worker;
- this.handler = new RequestHandler(worker.getConf(), worker.getHadoopConf());
- }
-
- @Override
- public BaseResponse loadData(LoadDataRequest request) {
- return handler.handleLoadData(request);
- }
-
- @Override
- public QueryResponse query(Scan scan) {
- return handler.handleScan(scan);
- }
-
- @Override
- public ShutdownResponse shutdown(ShutdownRequest request) {
- return handler.handleShutdown(request);
- }
-
- @Override
- public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
- return versionID;
- }
-
- @Override
- public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
- int clientMethodsHash) throws IOException {
- return null;
- }
-
- public Worker getWorker() {
- return worker;
- }
-
- public void setWorker(Worker worker) {
- this.worker = worker;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/Worker.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/Worker.java b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/Worker.java
index a360e36..e3546d9 100644
--- a/store/core/src/main/java/org/apache/carbondata/store/impl/worker/Worker.java
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/worker/Worker.java
@@ -23,13 +23,13 @@ import java.net.BindException;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.store.api.conf.StoreConf;
-import org.apache.carbondata.store.impl.rpc.RegistryService;
-import org.apache.carbondata.store.impl.rpc.ServiceFactory;
-import org.apache.carbondata.store.impl.rpc.StoreService;
-import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerRequest;
-import org.apache.carbondata.store.impl.rpc.model.RegisterWorkerResponse;
-import org.apache.carbondata.store.util.StoreUtil;
+import org.apache.carbondata.sdk.store.conf.StoreConf;
+import org.apache.carbondata.sdk.store.util.StoreUtil;
+import org.apache.carbondata.store.impl.service.DataService;
+import org.apache.carbondata.store.impl.service.RegistryService;
+import org.apache.carbondata.store.impl.service.ServiceFactory;
+import org.apache.carbondata.store.impl.service.model.RegisterWorkerRequest;
+import org.apache.carbondata.store.impl.service.model.RegisterWorkerResponse;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
@@ -66,15 +66,15 @@ public class Worker {
int coreNum = conf.workerCoreNum();
String host = conf.workerHost();
int port = conf.workerPort();
- StoreService queryService = new StoreServiceImpl(this);
+ DataService dataService = new DataServiceImpl(this);
do {
try {
server = new RPC.Builder(hadoopConf)
.setNumHandlers(coreNum)
.setBindAddress(host)
.setPort(port)
- .setProtocol(StoreService.class)
- .setInstance(queryService)
+ .setProtocol(DataService.class)
+ .setInstance(dataService)
.build();
server.start();
@@ -116,9 +116,11 @@ public class Worker {
}
private void registerToMaster() throws IOException {
- LOGGER.info("trying to register to master " + conf.masterHost() + ":" + conf.masterPort());
+ LOGGER.info("trying to register to master " +
+ conf.masterHost() + ":" + conf.registryServicePort());
if (registry == null) {
- registry = ServiceFactory.createRegistryService(conf.masterHost(), conf.masterPort());
+ registry = ServiceFactory.createRegistryService(
+ conf.masterHost(), conf.registryServicePort());
}
RegisterWorkerRequest request =
new RegisterWorkerRequest(conf.workerHost(), conf.workerPort(), conf.workerCoreNum());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java b/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java
deleted file mode 100644
index 775669f..0000000
--- a/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.store.util;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.memory.UnsafeMemoryManager;
-import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.store.api.conf.StoreConf;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.log4j.BasicConfigurator;
-import org.apache.log4j.PropertyConfigurator;
-
-@InterfaceAudience.Internal
-public class StoreUtil {
-
- private static LogService LOGGER = LogServiceFactory.getLogService(StoreUtil.class.getName());
-
- public static void loadProperties(String filePath, StoreConf conf) {
- InputStream input = null;
- try {
- input = new FileInputStream(filePath);
- Properties prop = new Properties();
- prop.load(input);
- for (Map.Entry<Object, Object> entry : prop.entrySet()) {
- conf.conf(entry.getKey().toString(), entry.getValue().toString());
- }
- LOGGER.audit("loaded properties: " + filePath);
- } catch (IOException ex) {
- LOGGER.error(ex, "Failed to load properties file " + filePath);
- } finally {
- if (input != null) {
- try {
- input.close();
- } catch (IOException e) {
- LOGGER.error(e);
- }
- }
- }
- }
-
- public static void initLog4j(String propertiesFilePath) {
- BasicConfigurator.configure();
- PropertyConfigurator.configure(propertiesFilePath);
- }
-
- public static byte[] serialize(Object object) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
- try {
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(object);
- } catch (IOException e) {
- LOGGER.error(e);
- }
- return baos.toByteArray();
- }
-
- public static Object deserialize(byte[] bytes) {
- if (bytes == null) {
- return null;
- }
- try {
- ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
- return ois.readObject();
- } catch (IOException e) {
- LOGGER.error(e);
- } catch (ClassNotFoundException e) {
- LOGGER.error(e);
- }
- return null;
- }
-
- public static void configureCSVInputFormat(Configuration configuration,
- CarbonLoadModel carbonLoadModel) {
- CSVInputFormat.setCommentCharacter(configuration, carbonLoadModel.getCommentChar());
- CSVInputFormat.setCSVDelimiter(configuration, carbonLoadModel.getCsvDelimiter());
- CSVInputFormat.setSkipEmptyLine(configuration, carbonLoadModel.getSkipEmptyLine());
- CSVInputFormat.setEscapeCharacter(configuration, carbonLoadModel.getEscapeChar());
- CSVInputFormat.setMaxColumns(configuration, carbonLoadModel.getMaxColumns());
- CSVInputFormat.setNumberOfColumns(configuration,
- "" + carbonLoadModel.getCsvHeaderColumns().length);
-
- CSVInputFormat.setHeaderExtractionEnabled(
- configuration,
- carbonLoadModel.getCsvHeader() == null ||
- StringUtils.isEmpty(carbonLoadModel.getCsvHeader()));
-
- CSVInputFormat.setQuoteCharacter(configuration, carbonLoadModel.getQuoteChar());
-
- CSVInputFormat.setReadBufferSize(
- configuration,
- CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
- CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT));
- }
-
- public static void clearUnsafeMemory(long taskId) {
- UnsafeMemoryManager.INSTANCE.freeMemoryAll(taskId);
- UnsafeSortMemoryManager.INSTANCE.freeMemoryAll(taskId);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/test/java/org/apache/carbondata/store/DistributedCarbonStoreTest.java
----------------------------------------------------------------------
diff --git a/store/core/src/test/java/org/apache/carbondata/store/DistributedCarbonStoreTest.java b/store/core/src/test/java/org/apache/carbondata/store/DistributedCarbonStoreTest.java
index 2448660..7f80a33 100644
--- a/store/core/src/test/java/org/apache/carbondata/store/DistributedCarbonStoreTest.java
+++ b/store/core/src/test/java/org/apache/carbondata/store/DistributedCarbonStoreTest.java
@@ -27,14 +27,15 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.expression.ColumnExpression;
import org.apache.carbondata.core.scan.expression.LiteralExpression;
import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
-import org.apache.carbondata.store.api.CarbonStore;
-import org.apache.carbondata.store.api.CarbonStoreFactory;
-import org.apache.carbondata.store.api.conf.StoreConf;
-import org.apache.carbondata.store.api.descriptor.LoadDescriptor;
-import org.apache.carbondata.store.api.descriptor.SelectDescriptor;
-import org.apache.carbondata.store.api.descriptor.TableDescriptor;
-import org.apache.carbondata.store.api.descriptor.TableIdentifier;
-import org.apache.carbondata.store.api.exception.StoreException;
+import org.apache.carbondata.sdk.store.CarbonStore;
+import org.apache.carbondata.sdk.store.CarbonStoreFactory;
+import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+import org.apache.carbondata.sdk.store.conf.StoreConf;
+import org.apache.carbondata.store.impl.master.Master;
import org.apache.carbondata.store.impl.worker.Worker;
import org.junit.After;
@@ -50,24 +51,36 @@ public class DistributedCarbonStoreTest {
private static CarbonStore store;
@BeforeClass
- public static void beforeAll() throws IOException, StoreException {
+ public static void beforeAll() throws IOException, CarbonException, InterruptedException {
projectFolder = new File(DistributedCarbonStoreTest.class.getResource("/").getPath() +
"../../../../").getCanonicalPath();
+
String confFile = projectFolder + "/store/conf/store.conf";
StoreConf storeConf = new StoreConf(confFile);
- store = CarbonStoreFactory.getDistributedStore("DistributedCarbonStoreTest", storeConf);
- projectFolder = new File(LocalCarbonStoreTest.class.getResource("/").getPath() + "../../../../")
- .getCanonicalPath();
+ new Thread(() -> {
+ try {
+ Master.main(new String[]{"", confFile});
+ } catch (InterruptedException | IOException e) {
+ throw new RuntimeException("failed to start master");
+ }
+ }).start();
+ Thread.sleep(1000);
// start worker
Worker worker = new Worker(storeConf);
worker.start();
+
+ Thread.sleep(1000);
+
+ store = CarbonStoreFactory.getDistributedStore("DistributedCarbonStoreTest", storeConf);
}
@AfterClass
public static void afterAll() throws IOException {
- store.close();
+ if (store != null) {
+ store.close();
+ }
}
@Before
@@ -81,13 +94,13 @@ public class DistributedCarbonStoreTest {
}
@Test
- public void testSelect() throws IOException, StoreException {
+ public void testSelect() throws CarbonException {
TableIdentifier tableIdentifier = new TableIdentifier("table_1", "default");
store.dropTable(tableIdentifier);
- TableDescriptor table = TableDescriptor
+ TableDescriptor descriptor = TableDescriptor
.builder()
- .ifNotExists()
.table(tableIdentifier)
+ .ifNotExists()
.comment("first table")
.column("shortField", DataTypes.SHORT, "short field")
.column("intField", DataTypes.INT, "int field")
@@ -101,7 +114,7 @@ public class DistributedCarbonStoreTest {
.column("floatField", DataTypes.DOUBLE, "float field")
.tblProperties(CarbonCommonConstants.SORT_COLUMNS, "intField")
.create();
- store.createTable(table);
+ store.createTable(descriptor);
// load one segment
LoadDescriptor load = LoadDescriptor
@@ -114,26 +127,26 @@ public class DistributedCarbonStoreTest {
store.loadData(load);
// select row
- SelectDescriptor select = SelectDescriptor
+ ScanDescriptor select = ScanDescriptor
.builder()
.table(tableIdentifier)
- .select("intField", "stringField")
+ .select(new String[]{"intField", "stringField"})
.limit(5)
.create();
- List<CarbonRow> result = store.select(select);
+ List<CarbonRow> result = store.scan(select);
Assert.assertEquals(5, result.size());
// select row with filter
- SelectDescriptor select2 = SelectDescriptor
+ ScanDescriptor select2 = ScanDescriptor
.builder()
.table(tableIdentifier)
- .select("intField", "stringField")
+ .select(new String[]{"intField", "stringField"})
.filter(new EqualToExpression(
new ColumnExpression("intField", DataTypes.INT),
new LiteralExpression(11, DataTypes.INT)))
.limit(5)
.create();
- List<CarbonRow> result2 = store.select(select2);
+ List<CarbonRow> result2 = store.scan(select2);
Assert.assertEquals(1, result2.size());
store.dropTable(tableIdentifier);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
----------------------------------------------------------------------
diff --git a/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java b/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
index 420c8cf..8cf7e14 100644
--- a/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
+++ b/store/core/src/test/java/org/apache/carbondata/store/LocalCarbonStoreTest.java
@@ -22,20 +22,19 @@ import java.io.IOException;
import java.util.List;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.expression.ColumnExpression;
import org.apache.carbondata.core.scan.expression.LiteralExpression;
import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
-import org.apache.carbondata.store.api.CarbonStore;
-import org.apache.carbondata.store.api.CarbonStoreFactory;
-import org.apache.carbondata.store.api.conf.StoreConf;
-import org.apache.carbondata.store.api.descriptor.LoadDescriptor;
-import org.apache.carbondata.store.api.descriptor.SelectDescriptor;
-import org.apache.carbondata.store.api.descriptor.TableDescriptor;
-import org.apache.carbondata.store.api.descriptor.TableIdentifier;
-import org.apache.carbondata.store.api.exception.StoreException;
+import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+import org.apache.carbondata.sdk.store.CarbonStore;
+import org.apache.carbondata.sdk.store.CarbonStoreFactory;
+import org.apache.carbondata.sdk.store.conf.StoreConf;
import org.junit.After;
import org.junit.AfterClass;
@@ -50,7 +49,7 @@ public class LocalCarbonStoreTest {
private static CarbonStore store;
@BeforeClass
- public static void setup() throws IOException, StoreException {
+ public static void setup() throws IOException, CarbonException {
StoreConf conf = new StoreConf("test", "./");
conf.conf(StoreConf.STORE_TEMP_LOCATION, "./temp");
store = CarbonStoreFactory.getLocalStore("LocalCarbonStoreTest", conf);
@@ -74,13 +73,13 @@ public class LocalCarbonStoreTest {
}
@Test
- public void testWriteAndReadFiles() throws IOException, StoreException {
+ public void testWriteAndReadFiles() throws IOException, CarbonException {
TableIdentifier tableIdentifier = new TableIdentifier("table_1", "default");
store.dropTable(tableIdentifier);
- TableDescriptor table = TableDescriptor
+ TableDescriptor descriptor = TableDescriptor
.builder()
- .ifNotExists()
.table(tableIdentifier)
+ .ifNotExists()
.comment("first table")
.column("shortField", DataTypes.SHORT, "short field")
.column("intField", DataTypes.INT, "int field")
@@ -94,7 +93,7 @@ public class LocalCarbonStoreTest {
.column("floatField", DataTypes.DOUBLE, "float field")
.tblProperties(CarbonCommonConstants.SORT_COLUMNS, "intField")
.create();
- store.createTable(table);
+ store.createTable(descriptor);
// load one segment
LoadDescriptor load = LoadDescriptor
@@ -107,26 +106,26 @@ public class LocalCarbonStoreTest {
store.loadData(load);
// select row
- SelectDescriptor select = SelectDescriptor
+ ScanDescriptor select = ScanDescriptor
.builder()
.table(tableIdentifier)
- .select("intField", "stringField")
+ .select(new String[]{"intField", "stringField"})
.limit(5)
.create();
- List<CarbonRow> result = store.select(select);
+ List<CarbonRow> result = store.scan(select);
Assert.assertEquals(5, result.size());
// select row with filter
- SelectDescriptor select2 = SelectDescriptor
+ ScanDescriptor select2 = ScanDescriptor
.builder()
.table(tableIdentifier)
- .select("intField", "stringField")
+ .select(new String[]{"intField", "stringField"})
.filter(new EqualToExpression(
new ColumnExpression("intField", DataTypes.INT),
new LiteralExpression(11, DataTypes.INT)))
.limit(5)
.create();
- List<CarbonRow> result2 = store.select(select2);
+ List<CarbonRow> result2 = store.scan(select2);
Assert.assertEquals(1, result2.size());
store.dropTable(tableIdentifier);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java
----------------------------------------------------------------------
diff --git a/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java b/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java
index f73591c..dffc8a7 100644
--- a/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java
+++ b/store/core/src/test/java/org/apache/carbondata/store/TestUtil.java
@@ -18,21 +18,11 @@
package org.apache.carbondata.store;
import java.io.File;
-import java.io.FileFilter;
import java.io.IOException;
-import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.sdk.file.CarbonWriter;
-import org.apache.carbondata.sdk.file.CarbonWriterBuilder;
-import org.apache.carbondata.sdk.file.Schema;
-import org.apache.carbondata.store.api.conf.StoreConf;
-import org.apache.carbondata.store.util.StoreUtil;
-
-import org.junit.Assert;
public class TestUtil {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/HorizonClient.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/HorizonClient.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/HorizonClient.java
index eaa4583..e1b039e 100644
--- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/HorizonClient.java
+++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/HorizonClient.java
@@ -28,7 +28,7 @@ import org.apache.carbondata.horizon.rest.model.view.CreateTableRequest;
import org.apache.carbondata.horizon.rest.model.view.DropTableRequest;
import org.apache.carbondata.horizon.rest.model.view.LoadRequest;
import org.apache.carbondata.horizon.rest.model.view.SelectRequest;
-import org.apache.carbondata.store.api.exception.StoreException;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
/**
* Client to send REST request to Horizon service
@@ -40,9 +40,9 @@ public interface HorizonClient extends Closeable {
/**
* Create a Table
* @param create descriptor for create table operation
- * @throws IOException if network or disk IO error occurs
+ * @throws CarbonException if network or disk IO error occurs
*/
- void createTable(CreateTableRequest create) throws IOException, StoreException;
+ void createTable(CreateTableRequest create) throws CarbonException;
/**
* Drop a Table, and remove all data in it
@@ -56,7 +56,7 @@ public interface HorizonClient extends Closeable {
* @param load descriptor for load operation
* @throws IOException if network or disk IO error occurs
*/
- void loadData(LoadRequest load) throws IOException, StoreException;
+ void loadData(LoadRequest load) throws IOException, CarbonException;
/**
* Scan a Table and return matched rows
@@ -64,7 +64,7 @@ public interface HorizonClient extends Closeable {
* @return matched rows
* @throws IOException if network or disk IO error occurs
*/
- List<CarbonRow> select(SelectRequest select) throws IOException, StoreException;
+ List<CarbonRow> select(SelectRequest select) throws IOException, CarbonException;
/**
* Executor a SQL statement
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java
index 076df70..b24c8d2 100644
--- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java
+++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/client/impl/SimpleHorizonClient.java
@@ -31,7 +31,7 @@ import org.apache.carbondata.horizon.rest.model.view.DropTableRequest;
import org.apache.carbondata.horizon.rest.model.view.LoadRequest;
import org.apache.carbondata.horizon.rest.model.view.SelectRequest;
import org.apache.carbondata.horizon.rest.model.view.SelectResponse;
-import org.apache.carbondata.store.api.exception.StoreException;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;
@@ -52,7 +52,7 @@ public class SimpleHorizonClient implements HorizonClient {
}
@Override
- public void createTable(CreateTableRequest create) throws IOException, StoreException {
+ public void createTable(CreateTableRequest create) throws CarbonException {
Objects.requireNonNull(create);
restTemplate.postForEntity(serviceUri + "/table/create", create, String.class);
}
@@ -64,18 +64,18 @@ public class SimpleHorizonClient implements HorizonClient {
}
@Override
- public void loadData(LoadRequest load) throws IOException, StoreException {
+ public void loadData(LoadRequest load) throws IOException, CarbonException {
Objects.requireNonNull(load);
restTemplate.postForEntity(serviceUri + "/table/load", load, String.class);
}
@Override
- public List<CarbonRow> select(SelectRequest select) throws IOException, StoreException {
+ public List<CarbonRow> select(SelectRequest select) throws IOException, CarbonException {
Objects.requireNonNull(select);
ResponseEntity<SelectResponse> response =
restTemplate.postForEntity(serviceUri + "/table/select", select, SelectResponse.class);
- Object[][] rows = Objects.requireNonNull(response.getBody()).getRows();
- List<CarbonRow> output = new ArrayList<>(rows.length);
+ List<Object[]> rows = Objects.requireNonNull(response.getBody()).getRows();
+ List<CarbonRow> output = new ArrayList<>(rows.size());
for (Object[] row : rows) {
output.add(new CarbonRow(row));
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java
index a30b587..cffca07 100644
--- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java
+++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/Horizon.java
@@ -17,7 +17,8 @@
package org.apache.carbondata.horizon.rest.controller;
-import org.apache.carbondata.store.api.conf.StoreConf;
+
+import org.apache.carbondata.sdk.store.conf.StoreConf;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java
index a273f54..d3d1bd7 100644
--- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java
+++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/controller/HorizonController.java
@@ -18,7 +18,8 @@ package org.apache.carbondata.horizon.rest.controller;
import java.io.IOException;
import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -36,14 +37,17 @@ import org.apache.carbondata.horizon.rest.model.view.DropTableRequest;
import org.apache.carbondata.horizon.rest.model.view.LoadRequest;
import org.apache.carbondata.horizon.rest.model.view.SelectRequest;
import org.apache.carbondata.horizon.rest.model.view.SelectResponse;
-import org.apache.carbondata.store.api.CarbonStore;
-import org.apache.carbondata.store.api.CarbonStoreFactory;
-import org.apache.carbondata.store.api.conf.StoreConf;
-import org.apache.carbondata.store.api.descriptor.LoadDescriptor;
-import org.apache.carbondata.store.api.descriptor.SelectDescriptor;
-import org.apache.carbondata.store.api.descriptor.TableDescriptor;
-import org.apache.carbondata.store.api.descriptor.TableIdentifier;
-import org.apache.carbondata.store.api.exception.StoreException;
+import org.apache.carbondata.sdk.store.conf.StoreConf;
+import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+import org.apache.carbondata.store.devapi.InternalCarbonStore;
+import org.apache.carbondata.store.devapi.InternalCarbonStoreFactory;
+import org.apache.carbondata.store.devapi.ResultBatch;
+import org.apache.carbondata.store.devapi.ScanUnit;
+import org.apache.carbondata.store.devapi.Scanner;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
@@ -59,15 +63,15 @@ public class HorizonController {
private static LogService LOGGER =
LogServiceFactory.getLogService(HorizonController.class.getName());
- private CarbonStore store;
+ private InternalCarbonStore store;
- public HorizonController() throws StoreException {
+ public HorizonController() throws CarbonException {
String storeFile = System.getProperty("carbonstore.conf.file");
StoreConf storeConf = new StoreConf();
try {
storeConf.conf(StoreConf.STORE_LOCATION, CarbonProperties.getStorePath())
.conf(StoreConf.MASTER_HOST, InetAddress.getLocalHost().getHostAddress())
- .conf(StoreConf.MASTER_PORT, CarbonProperties.getSearchMasterPort())
+ .conf(StoreConf.STORE_PORT, CarbonProperties.getSearchMasterPort())
.conf(StoreConf.WORKER_HOST, InetAddress.getLocalHost().getHostAddress())
.conf(StoreConf.WORKER_PORT, CarbonProperties.getSearchWorkerPort())
.conf(StoreConf.WORKER_CORE_NUM, 2);
@@ -76,13 +80,10 @@ public class HorizonController {
storeConf.load(storeFile);
}
- } catch (UnknownHostException e) {
- throw new StoreException(e);
+ store = InternalCarbonStoreFactory.getStore(storeConf);
} catch (IOException e) {
- throw new StoreException(e);
+ throw new CarbonException(e);
}
-
- store = CarbonStoreFactory.getDistributedStore("GlobalStore", storeConf);
}
@RequestMapping(value = "echo")
@@ -92,7 +93,7 @@ public class HorizonController {
@RequestMapping(value = "/table/create", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<String> createTable(
- @RequestBody CreateTableRequest request) throws StoreException, IOException {
+ @RequestBody CreateTableRequest request) throws CarbonException {
RequestValidator.validateTable(request);
TableDescriptor tableDescriptor = request.convertToDto();
store.createTable(tableDescriptor);
@@ -101,7 +102,7 @@ public class HorizonController {
@RequestMapping(value = "/table/drop", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<String> dropTable(
- @RequestBody DropTableRequest request) throws StoreException, IOException {
+ @RequestBody DropTableRequest request) throws CarbonException {
RequestValidator.validateDrop(request);
store.dropTable(new TableIdentifier(request.getTableName(), request.getDatabaseName()));
return new ResponseEntity<>(String.valueOf(true), HttpStatus.OK);
@@ -109,7 +110,7 @@ public class HorizonController {
@RequestMapping(value = "/table/load", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<String> load(@RequestBody LoadRequest request)
- throws StoreException, IOException {
+ throws CarbonException, IOException {
RequestValidator.validateLoad(request);
LoadDescriptor loadDescriptor = request.convertToDto();
store.loadData(loadDescriptor);
@@ -118,22 +119,29 @@ public class HorizonController {
@RequestMapping(value = "/table/select", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<SelectResponse> select(@RequestBody SelectRequest request)
- throws StoreException, IOException {
+ throws CarbonException {
long start = System.currentTimeMillis();
RequestValidator.validateSelect(request);
TableIdentifier table = new TableIdentifier(request.getTableName(), request.getDatabaseName());
- CarbonTable carbonTable = store.getTable(table);
+ CarbonTable carbonTable = store.getCarbonTable(table);
Expression expression = Parser.parseFilter(request.getFilter(), carbonTable);
- SelectDescriptor selectDescriptor = new SelectDescriptor(
+ Scanner<CarbonRow> scanner = store.newScanner(table);
+ List<ScanUnit> scanUnits = scanner.prune(table, expression);
+ ScanDescriptor scanDescriptor = new ScanDescriptor(
table, request.getSelect(), expression, request.getLimit());
- List<CarbonRow> result = store.select(selectDescriptor);
- Iterator<CarbonRow> iterator = result.iterator();
- Object[][] output = new Object[result.size()][];
- int i = 0;
- while (iterator.hasNext()) {
- output[i] = (iterator.next().getData());
- i++;
+ ArrayList<Object[]> output = new ArrayList<>();
+ for (ScanUnit scanUnit : scanUnits) {
+ Iterator<? extends ResultBatch<CarbonRow>> iterator = scanner.scan(
+ scanUnit, scanDescriptor, new HashMap<String, String>());
+
+ while (iterator.hasNext()) {
+ ResultBatch<CarbonRow> rows = iterator.next();
+ while (rows.hasNext()) {
+ output.add(rows.next().getData());
+ }
+ }
}
+
long end = System.currentTimeMillis();
LOGGER.audit("[" + request.getRequestId() + "] HorizonController select " +
request.getDatabaseName() + "." + request.getTableName() +
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java
index fbba57b..15a30cb 100644
--- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java
+++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/validate/RequestValidator.java
@@ -21,63 +21,63 @@ import org.apache.carbondata.horizon.rest.model.view.CreateTableRequest;
import org.apache.carbondata.horizon.rest.model.view.DropTableRequest;
import org.apache.carbondata.horizon.rest.model.view.LoadRequest;
import org.apache.carbondata.horizon.rest.model.view.SelectRequest;
-import org.apache.carbondata.store.api.exception.StoreException;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
import org.apache.commons.lang.StringUtils;
public class RequestValidator {
- public static void validateSelect(SelectRequest request) throws StoreException {
+ public static void validateSelect(SelectRequest request) throws CarbonException {
if (request == null) {
- throw new StoreException("Select should not be null");
+ throw new CarbonException("Select should not be null");
}
if (StringUtils.isEmpty(request.getDatabaseName())) {
- throw new StoreException("database name is invalid");
+ throw new CarbonException("database name is invalid");
}
if (StringUtils.isEmpty(request.getTableName())) {
- throw new StoreException("table name is invalid");
+ throw new CarbonException("table name is invalid");
}
}
- public static void validateTable(CreateTableRequest request) throws StoreException {
+ public static void validateTable(CreateTableRequest request) throws CarbonException {
if (request == null) {
- throw new StoreException("TableDescriptor should not be null");
+ throw new CarbonException("TableDescriptor should not be null");
}
if (StringUtils.isEmpty(request.getDatabaseName())) {
- throw new StoreException("database name is invalid");
+ throw new CarbonException("database name is invalid");
}
if (StringUtils.isEmpty(request.getTableName())) {
- throw new StoreException("table name is invalid");
+ throw new CarbonException("table name is invalid");
}
if (request.getFields() == null || request.getFields().length == 0) {
- throw new StoreException("fields should not be empty");
+ throw new CarbonException("fields should not be empty");
}
}
- public static void validateLoad(LoadRequest request) throws StoreException {
+ public static void validateLoad(LoadRequest request) throws CarbonException {
if (request == null) {
- throw new StoreException("LoadDescriptor should not be null");
+ throw new CarbonException("LoadDescriptor should not be null");
}
if (StringUtils.isEmpty(request.getDatabaseName())) {
- throw new StoreException("database name is invalid");
+ throw new CarbonException("database name is invalid");
}
if (StringUtils.isEmpty(request.getTableName())) {
- throw new StoreException("table name is invalid");
+ throw new CarbonException("table name is invalid");
}
if (StringUtils.isEmpty(request.getInputPath())) {
- throw new StoreException("input path is invalid");
+ throw new CarbonException("input path is invalid");
}
}
- public static void validateDrop(DropTableRequest request) throws StoreException {
+ public static void validateDrop(DropTableRequest request) throws CarbonException {
if (request == null) {
- throw new StoreException("DropTableRequest should not be null");
+ throw new CarbonException("DropTableRequest should not be null");
}
if (StringUtils.isEmpty(request.getDatabaseName())) {
- throw new StoreException("database name is invalid");
+ throw new CarbonException("database name is invalid");
}
if (StringUtils.isEmpty(request.getTableName())) {
- throw new StoreException("table name is invalid");
+ throw new CarbonException("table name is invalid");
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/CreateTableRequest.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/CreateTableRequest.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/CreateTableRequest.java
index cf59f7f..623abee 100644
--- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/CreateTableRequest.java
+++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/CreateTableRequest.java
@@ -24,8 +24,8 @@ import java.util.Map;
import org.apache.carbondata.sdk.file.Field;
import org.apache.carbondata.sdk.file.Schema;
-import org.apache.carbondata.store.api.descriptor.TableDescriptor;
-import org.apache.carbondata.store.api.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.descriptor.TableDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
public class CreateTableRequest extends Request {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/FieldRequest.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/FieldRequest.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/FieldRequest.java
index b809d9e..200b3a2 100644
--- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/FieldRequest.java
+++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/FieldRequest.java
@@ -108,7 +108,7 @@ public class FieldRequest {
field.setPrecision(precision);
field.setScale(scale);
field.setColumnComment(comment);
- field.setChildren(new LinkedList<StructField>());
+ field.setChildren(new LinkedList<Field>());
return field;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/LoadRequest.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/LoadRequest.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/LoadRequest.java
index c91f5f5..dfe21f6 100644
--- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/LoadRequest.java
+++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/LoadRequest.java
@@ -20,8 +20,8 @@ package org.apache.carbondata.horizon.rest.model.view;
import java.util.HashMap;
import java.util.Map;
-import org.apache.carbondata.store.api.descriptor.LoadDescriptor;
-import org.apache.carbondata.store.api.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
public class LoadRequest extends Request {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java
index 6bf5c75..e4f8cf8 100644
--- a/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java
+++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/rest/model/view/SelectResponse.java
@@ -17,23 +17,22 @@
package org.apache.carbondata.horizon.rest.model.view;
+import java.util.List;
+
public class SelectResponse extends Response {
- private Object[][] rows;
+ private List<Object[]> rows;
public SelectResponse() {
}
- public SelectResponse(SelectRequest request, String message, Object[][] rows) {
+ public SelectResponse(SelectRequest request, String message, List<Object[]> rows) {
super(request, message);
this.rows = rows;
}
- public Object[][] getRows() {
+ public List<Object[]> getRows() {
return rows;
}
- public void setRows(Object[][] rows) {
- this.rows = rows;
- }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java
----------------------------------------------------------------------
diff --git a/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java b/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java
index 91a9dba..59c4bd1 100644
--- a/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java
+++ b/store/horizon/src/test/java/org/apache/carbondata/horizon/HorizonTest.java
@@ -32,9 +32,9 @@ import org.apache.carbondata.horizon.rest.model.view.LoadRequest;
import org.apache.carbondata.horizon.rest.model.view.SelectRequest;
import org.apache.carbondata.horizon.rest.model.view.SelectResponse;
import org.apache.carbondata.store.api.conf.StoreConf;
-import org.apache.carbondata.store.api.exception.StoreException;
+import org.apache.carbondata.store.api.exception.CarbonException;
import org.apache.carbondata.store.impl.worker.Worker;
-import org.apache.carbondata.store.util.StoreUtil;
+import org.apache.carbondata.sdk.store.util.StoreUtil;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -109,13 +109,13 @@ public class HorizonTest {
SelectRequest select = createSelectRequest(5, null, "intField", "stringField");
SelectResponse result =
restTemplate.postForObject(serviceUri + "/table/select", select, SelectResponse.class);
- Assert.assertEquals(5, result.getRows().length);
+ Assert.assertEquals(5, result.getRows().size());
// select row with filter
SelectRequest filter = createSelectRequest(5, "intField = 11", "intField", "stringField");
SelectResponse filterResult =
restTemplate.postForObject(serviceUri + "/table/select", filter, SelectResponse.class);
- Assert.assertEquals(1, filterResult.getRows().length);
+ Assert.assertEquals(1, filterResult.getRows().size());
request = createDropTableRequest();
response = restTemplate.postForObject(serviceUri + "/table/drop", request, String.class);
@@ -173,7 +173,7 @@ public class HorizonTest {
}
@Test
- public void testHorizonClient() throws IOException, StoreException {
+ public void testHorizonClient() throws IOException, CarbonException {
HorizonClient client = new SimpleHorizonClient(serviceUri);
DropTableRequest drop = createDropTableRequest();
client.dropTable(drop);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/pom.xml
----------------------------------------------------------------------
diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml
index aecf7e2..b16a290 100644
--- a/store/sdk/pom.xml
+++ b/store/sdk/pom.xml
@@ -51,8 +51,8 @@
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.8</source>
+ <target>1.8</target>
</configuration>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index fdd1f5a..771896b 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
+import java.util.stream.Collectors;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.logging.LogService;
@@ -30,6 +31,7 @@ import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDi
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.metadata.datatype.StructType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
@@ -239,9 +241,9 @@ public class AvroCarbonWriter extends CarbonWriter {
return new Field(FieldName, DataTypes.DOUBLE);
case RECORD:
// recursively get the sub fields
- ArrayList<StructField> structSubFields = new ArrayList<>();
+ ArrayList<Field> structSubFields = new ArrayList<>();
for (Schema.Field avroSubField : childSchema.getFields()) {
- StructField structField = prepareSubFields(avroSubField.name(), avroSubField.schema());
+ Field structField = prepareSubFields(avroSubField.name(), avroSubField.schema());
if (structField != null) {
structSubFields.add(structField);
}
@@ -249,9 +251,9 @@ public class AvroCarbonWriter extends CarbonWriter {
return new Field(FieldName, "struct", structSubFields);
case ARRAY:
// recursively get the sub fields
- ArrayList<StructField> arraySubField = new ArrayList<>();
+ ArrayList<Field> arraySubField = new ArrayList<>();
// array will have only one sub field.
- StructField structField = prepareSubFields("val", childSchema.getElementType());
+ Field structField = prepareSubFields("val", childSchema.getElementType());
if (structField != null) {
arraySubField.add(structField);
return new Field(FieldName, "array", arraySubField);
@@ -266,51 +268,51 @@ public class AvroCarbonWriter extends CarbonWriter {
}
}
- private static StructField prepareSubFields(String FieldName, Schema childSchema) {
+ private static Field prepareSubFields(String FieldName, Schema childSchema) {
Schema.Type type = childSchema.getType();
LogicalType logicalType = childSchema.getLogicalType();
switch (type) {
case BOOLEAN:
- return new StructField(FieldName, DataTypes.BOOLEAN);
+ return new Field(FieldName, DataTypes.BOOLEAN);
case INT:
if (logicalType instanceof LogicalTypes.Date) {
- return new StructField(FieldName, DataTypes.DATE);
+ return new Field(FieldName, DataTypes.DATE);
} else {
LOGGER.warn("Unsupported logical type. Considering Data Type as INT for " + childSchema
.getName());
- return new StructField(FieldName, DataTypes.INT);
+ return new Field(FieldName, DataTypes.INT);
}
case LONG:
if (logicalType instanceof LogicalTypes.TimestampMillis
|| logicalType instanceof LogicalTypes.TimestampMicros) {
- return new StructField(FieldName, DataTypes.TIMESTAMP);
+ return new Field(FieldName, DataTypes.TIMESTAMP);
} else {
LOGGER.warn("Unsupported logical type. Considering Data Type as LONG for " + childSchema
.getName());
- return new StructField(FieldName, DataTypes.LONG);
+ return new Field(FieldName, DataTypes.LONG);
}
case DOUBLE:
- return new StructField(FieldName, DataTypes.DOUBLE);
+ return new Field(FieldName, DataTypes.DOUBLE);
case STRING:
- return new StructField(FieldName, DataTypes.STRING);
+ return new Field(FieldName, DataTypes.STRING);
case FLOAT:
- return new StructField(FieldName, DataTypes.DOUBLE);
+ return new Field(FieldName, DataTypes.DOUBLE);
case RECORD:
// recursively get the sub fields
- ArrayList<StructField> structSubFields = new ArrayList<>();
+ ArrayList<Field> structSubFields = new ArrayList<>();
for (Schema.Field avroSubField : childSchema.getFields()) {
- StructField structField = prepareSubFields(avroSubField.name(), avroSubField.schema());
+ Field structField = prepareSubFields(avroSubField.name(), avroSubField.schema());
if (structField != null) {
structSubFields.add(structField);
}
}
- return (new StructField(FieldName, DataTypes.createStructType(structSubFields)));
+ return (new Field(FieldName, createStructType(structSubFields)));
case ARRAY:
// recursively get the sub fields
// array will have only one sub field.
DataType subType = getMappingDataTypeForArrayRecord(childSchema.getElementType());
if (subType != null) {
- return (new StructField(FieldName, DataTypes.createArrayType(subType)));
+ return (new Field(FieldName, DataTypes.createArrayType(subType)));
} else {
return null;
}
@@ -322,6 +324,14 @@ public class AvroCarbonWriter extends CarbonWriter {
}
}
+ private static StructType createStructType(List<Field> fields) {
+ List<StructField> f = fields.stream().map(field ->
+ new StructField(field.getFieldName(), field.getDataType(),
+ createStructType(field.getChildren()).getFields())
+ ).collect(Collectors.toList());
+ return DataTypes.createStructType(f);
+ }
+
private static DataType getMappingDataTypeForArrayRecord(Schema childSchema) {
LogicalType logicalType = childSchema.getLogicalType();
switch (childSchema.getType()) {
@@ -360,14 +370,14 @@ public class AvroCarbonWriter extends CarbonWriter {
return DataTypes.DOUBLE;
case RECORD:
// recursively get the sub fields
- ArrayList<StructField> structSubFields = new ArrayList<>();
+ ArrayList<Field> structSubFields = new ArrayList<>();
for (Schema.Field avroSubField : childSchema.getFields()) {
- StructField structField = prepareSubFields(avroSubField.name(), avroSubField.schema());
+ Field structField = prepareSubFields(avroSubField.name(), avroSubField.schema());
if (structField != null) {
structSubFields.add(structField);
}
}
- return DataTypes.createStructType(structSubFields);
+ return createStructType(structSubFields);
case ARRAY:
// array will have only one sub field.
DataType subType = getMappingDataTypeForArrayRecord(childSchema.getElementType());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index a9d725f..92ed0d8 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -513,8 +513,8 @@ public class CarbonWriterBuilder {
} else if (field.getDataType().getName().equalsIgnoreCase("STRUCT")) {
// Loop through the inner columns and for a StructData
List<StructField> structFieldsArray =
- new ArrayList<StructField>(field.getChildren().size());
- for (StructField childFld : field.getChildren()) {
+ new ArrayList<>(field.getChildren().size());
+ for (Field childFld : field.getChildren()) {
structFieldsArray
.add(new StructField(childFld.getFieldName(), childFld.getDataType()));
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
index add10c1..0d70c3b 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
@@ -17,13 +17,13 @@
package org.apache.carbondata.sdk.file;
+import java.io.Serializable;
import java.util.LinkedList;
import java.util.List;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.StructField;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.DataTypeUtil;
@@ -32,11 +32,11 @@ import org.apache.carbondata.core.util.DataTypeUtil;
*/
@InterfaceAudience.User
@InterfaceStability.Unstable
-public class Field {
+public class Field implements Serializable {
private String name;
private DataType type;
- private List<StructField> children;
+ private List<Field> children;
private String parent;
private String storeType = "columnnar";
private int schemaOrdinal = -1;
@@ -54,11 +54,11 @@ public class Field {
this(name, DataTypeUtil.valueOf(type));
}
- public Field(String name, String type, List<StructField> fields) {
+ public Field(String name, String type, List<Field> fields) {
this(name, DataTypeUtil.valueOf(type), fields);
}
- public Field(String name, DataType type, List<StructField> fields) {
+ public Field(String name, DataType type, List<Field> fields) {
this.name = name;
this.type = type;
this.children = fields;
@@ -91,11 +91,11 @@ public class Field {
return type;
}
- public List<StructField> getChildren() {
+ public List<Field> getChildren() {
return children;
}
- public void setChildren(List<StructField> children) {
+ public void setChildren(List<Field> children) {
this.children = children;
}
@@ -150,4 +150,5 @@ public class Field {
public void setColumnComment(String columnComment) {
this.columnComment = columnComment;
}
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java
index c9622e1..075ae71 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java
@@ -18,6 +18,7 @@
package org.apache.carbondata.sdk.file;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
@@ -44,7 +45,7 @@ import org.apache.commons.lang.StringUtils;
*/
@InterfaceAudience.User
@InterfaceStability.Unstable
-public class Schema {
+public class Schema implements Serializable {
private Field[] fields;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f10122a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/CarbonStore.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/store/CarbonStore.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/CarbonStore.java
new file mode 100644
index 0000000..0472b75
--- /dev/null
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/store/CarbonStore.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.store;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.StructType;
+import org.apache.carbondata.sdk.store.descriptor.LoadDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.ScanDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableDescriptor;
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier;
+import org.apache.carbondata.sdk.store.exception.CarbonException;
+
+/**
+ * Public Interface of CarbonStore
+ */
+@InterfaceAudience.User
+@InterfaceStability.Unstable
+public interface CarbonStore extends Closeable {
+
+ ////////////////////////////////////////////////////////////////////
+ ///// Metadata Operation /////
+ ////////////////////////////////////////////////////////////////////
+
+ /**
+ * Create a Table
+ * @param descriptor descriptor for create table operation
+ * @throws CarbonException if any error occurs
+ */
+ void createTable(TableDescriptor descriptor) throws CarbonException;
+
+ /**
+ * Drop a Table, and remove all data in it
+ * @param table table identifier
+ * @throws CarbonException if any error occurs
+ */
+ void dropTable(TableIdentifier table) throws CarbonException;
+
+ /**
+ * @return all table created
+ * @throws CarbonException if any error occurs
+ */
+ List<TableDescriptor> listTable() throws CarbonException;
+
+ /**
+ * Return table descriptor by specified identifier
+ * @param table table identifier
+ * @return table descriptor
+ * @throws CarbonException if any error occurs
+ */
+ TableDescriptor getDescriptor(TableIdentifier table) throws CarbonException;
+
+ /**
+ * Alter table operation
+ * @param table table identifier
+ * @param newTable new table descriptor to alter to
+ * @throws CarbonException if any error occurs
+ */
+ void alterTable(TableIdentifier table, TableDescriptor newTable) throws CarbonException;
+
+
+ ////////////////////////////////////////////////////////////////////
+ ///// Write Operation /////
+ ////////////////////////////////////////////////////////////////////
+
+ /**
+ * Trigger a Load into the table specified by load descriptor
+ * @param load descriptor for load operation
+ * @throws CarbonException if any error occurs
+ */
+ void loadData(LoadDescriptor load) throws CarbonException;
+
+ /**
+ * Return true if this table has primary key defined when create table using
+ * {@link #createTable(TableDescriptor)}
+ *
+ * For such table, upsert, delete and lookup is supported
+ *
+ * @return true if this table has primary key.
+ */
+ default boolean isPrimaryKeyDefined(TableIdentifier identifier) {
+ return false;
+ }
+
+ /**
+ * Insert a batch of rows if key is not exist, otherwise update the row
+ * @param row rows to be upsert
+ * @param schema schema of the input row (fields without the primary key)
+ * @throws CarbonException if any error occurs
+ */
+ void upsert(Iterator<KeyedRow> row, StructType schema) throws CarbonException;
+
+ /**
+ * Delete a batch of rows
+ * @param keys keys to be deleted
+ * @throws CarbonException if any error occurs
+ */
+ void delete(Iterator<PrimaryKey> keys) throws CarbonException;
+
+
+ ////////////////////////////////////////////////////////////////////
+ ///// Read Operation /////
+ ////////////////////////////////////////////////////////////////////
+
+ /**
+ * Scan the specified table and return matched rows
+ *
+ * @param select descriptor for scan operation
+ * @return matched rows
+ * @throws CarbonException if any error occurs
+ */
+ List<CarbonRow> scan(ScanDescriptor select) throws CarbonException;
+
+ /**
+ * Lookup and return a row with specified primary key
+ * @param key key to lookup
+ * @return matched row for the specified key
+ * @throws CarbonException if any error occurs
+ */
+ Row lookup(PrimaryKey key) throws CarbonException;
+
+ /**
+ * Lookup by filter expression and return a list of matched row
+ *
+ * @param tableIdentifier table identifier
+ * @param filterExpression filter expression, like "col3 = 1"
+ * @return matched row for the specified filter
+ * @throws CarbonException if any error occurs
+ */
+ List<Row> lookup(TableIdentifier tableIdentifier, String filterExpression) throws CarbonException;
+}